Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate gRPC queries into BaseApp #6335

Merged
merged 11 commits into from
Jun 4, 2020
91 changes: 58 additions & 33 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ func (app *BaseApp) halt() {
// Query implements the ABCI interface. It delegates to CommitMultiStore if it
// implements Queryable.
func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery {
// handle gRPC routes first rather than calling splitPath because '/' characters
// are used as part of gRPC paths
if grpcHandler := app.grpcQueryRouter.Route(req.Path); grpcHandler != nil {
return app.handleQueryGRPC(grpcHandler, req)
}

path := splitPath(req.Path)
if len(path) == 0 {
sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, "no query path provided"))
Expand All @@ -317,6 +323,53 @@ func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery {
return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, "unknown query path"))
}

func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req abci.RequestQuery) abci.ResponseQuery {
ctx, err := app.createQueryContext(req)
if err != nil {
return sdkerrors.QueryResult(err)
}

res, err := handler(ctx, req)
if err != nil {
res = sdkerrors.QueryResult(err)
res.Height = req.Height
return res
}

return res
}

func (app *BaseApp) createQueryContext(req abci.RequestQuery) (sdk.Context, error) {
// when a client did not provide a query height, manually inject the latest
if req.Height == 0 {
req.Height = app.LastBlockHeight()
}

if req.Height <= 1 && req.Prove {
return sdk.Context{},
sdkerrors.Wrap(
sdkerrors.ErrInvalidRequest,
"cannot query with proof when height <= 1; please provide a valid height",
)
}

cacheMS, err := app.cms.CacheMultiStoreWithVersion(req.Height)
if err != nil {
return sdk.Context{},
sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"failed to load state at height %d; %s (latest height: %d)", req.Height, err, app.LastBlockHeight(),
)
}

// cache wrap the commit-multistore for safety
ctx := sdk.NewContext(
cacheMS, app.checkState.ctx.BlockHeader(), true, app.logger,
).WithMinGasPrices(app.minGasPrices)

return ctx, nil
}

func handleQueryApp(app *BaseApp, path []string, req abci.RequestQuery) abci.ResponseQuery {
if len(path) >= 2 {
switch path[1] {
Expand Down Expand Up @@ -439,48 +492,20 @@ func handleQueryCustom(app *BaseApp, path []string, req abci.RequestQuery) abci.
return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no custom querier found for route %s", path[1]))
}

// when a client did not provide a query height, manually inject the latest
if req.Height == 0 {
req.Height = app.LastBlockHeight()
}

if req.Height <= 1 && req.Prove {
return sdkerrors.QueryResult(
sdkerrors.Wrap(
sdkerrors.ErrInvalidRequest,
"cannot query with proof when height <= 1; please provide a valid height",
),
)
}

cacheMS, err := app.cms.CacheMultiStoreWithVersion(req.Height)
ctx, err := app.createQueryContext(req)
if err != nil {
return sdkerrors.QueryResult(
sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"failed to load state at height %d; %s (latest height: %d)", req.Height, err, app.LastBlockHeight(),
),
)
return sdkerrors.QueryResult(err)
}

// cache wrap the commit-multistore for safety
ctx := sdk.NewContext(
cacheMS, app.checkState.ctx.BlockHeader(), true, app.logger,
).WithMinGasPrices(app.minGasPrices)

// Passes the rest of the path as an argument to the querier.
//
// For example, in the path "custom/gov/proposal/test", the gov querier gets
// []string{"proposal", "test"} as the path.
resBytes, err := querier(ctx, path[2:], req)
if err != nil {
space, code, log := sdkerrors.ABCIInfo(err, false)
return abci.ResponseQuery{
Code: code,
Codespace: space,
Log: log,
Height: req.Height,
}
res := sdkerrors.QueryResult(err)
res.Height = req.Height
return res
}

return abci.ResponseQuery{
Expand Down
41 changes: 24 additions & 17 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"runtime/debug"
"strings"

"github.com/gogo/protobuf/grpc"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log"
Expand Down Expand Up @@ -41,14 +43,15 @@ type (
// BaseApp reflects the ABCI application implementation.
type BaseApp struct { // nolint: maligned
// initialized on creation
logger log.Logger
name string // application name from abci.Info
db dbm.DB // common DB backend
cms sdk.CommitMultiStore // Main (uncached) state
storeLoader StoreLoader // function to handle store loading, may be overridden with SetStoreLoader()
router sdk.Router // handle any kind of message
queryRouter sdk.QueryRouter // router for redirecting query calls
txDecoder sdk.TxDecoder // unmarshal []byte into sdk.Tx
logger log.Logger
name string // application name from abci.Info
db dbm.DB // common DB backend
cms sdk.CommitMultiStore // Main (uncached) state
storeLoader StoreLoader // function to handle store loading, may be overridden with SetStoreLoader()
router sdk.Router // handle any kind of message
queryRouter sdk.QueryRouter // router for redirecting query calls
grpcQueryRouter *GRPCQueryRouter // router for redirecting gRPC query calls
txDecoder sdk.TxDecoder // unmarshal []byte into sdk.Tx

anteHandler sdk.AnteHandler // ante handler for fee and auth
initChainer sdk.InitChainer // initialize state with validators and state blob
Expand Down Expand Up @@ -101,15 +104,16 @@ func NewBaseApp(
name string, logger log.Logger, db dbm.DB, txDecoder sdk.TxDecoder, options ...func(*BaseApp),
) *BaseApp {
app := &BaseApp{
logger: logger,
name: name,
db: db,
cms: store.NewCommitMultiStore(db),
storeLoader: DefaultStoreLoader,
router: NewRouter(),
queryRouter: NewQueryRouter(),
txDecoder: txDecoder,
fauxMerkleMode: false,
logger: logger,
name: name,
db: db,
cms: store.NewCommitMultiStore(db),
storeLoader: DefaultStoreLoader,
router: NewRouter(),
queryRouter: NewQueryRouter(),
grpcQueryRouter: NewGRPCQueryRouter(),
txDecoder: txDecoder,
fauxMerkleMode: false,
}

for _, option := range options {
Expand Down Expand Up @@ -282,6 +286,9 @@ func (app *BaseApp) Router() sdk.Router {
// QueryRouter returns the QueryRouter of a BaseApp.
func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter }

// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() grpc.Server { return app.grpcQueryRouter }

// Seal seals a BaseApp. It prohibits any further modifications to a BaseApp.
func (app *BaseApp) Seal() { app.sealed = true }

Expand Down
36 changes: 36 additions & 0 deletions baseapp/baseapp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"testing"

"github.com/cosmos/cosmos-sdk/codec/testdata"

"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1506,6 +1508,40 @@ func TestQuery(t *testing.T) {
require.Equal(t, value, res.Value)
}

func TestGRPCQuery(t *testing.T) {
grpcQueryOpt := func(bapp *BaseApp) {
testdata.RegisterTestServiceServer(
bapp.GRPCQueryRouter(),
testServer{},
)
}

app := setupBaseApp(t, grpcQueryOpt)

app.InitChain(abci.RequestInitChain{})
header := abci.Header{Height: app.LastBlockHeight() + 1}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
app.Commit()

req := testdata.SayHelloRequest{Name: "foo"}
reqBz, err := req.Marshal()
require.NoError(t, err)

reqQuery := abci.RequestQuery{
Data: reqBz,
Path: "/cosmos_sdk.codec.v1.TestService/SayHello",
}

resQuery := app.Query(reqQuery)

require.Equal(t, abci.CodeTypeOK, resQuery.Code, resQuery)

var res testdata.SayHelloResponse
err = res.Unmarshal(resQuery.Value)
require.NoError(t, err)
require.Equal(t, "Hello foo!", res.Greeting)
}

// Test p2p filter queries
func TestP2PQuery(t *testing.T) {
addrPeerFilterOpt := func(bapp *BaseApp) {
Expand Down
14 changes: 7 additions & 7 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ import (

var protoCodec = encoding.GetCodec(proto.Name)

// GRPCRouter routes ABCI Query requests to GRPC handlers
type GRPCRouter struct {
// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
}

var _ gogogrpc.Server

// NewGRPCRouter creates a new GRPCRouter
func NewGRPCRouter() *GRPCRouter {
return &GRPCRouter{
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
routes: map[string]GRPCQueryHandler{},
}
}
Expand All @@ -34,7 +34,7 @@ type GRPCQueryHandler = func(ctx sdk.Context, req abci.RequestQuery) (abci.Respo

// Route returns the GRPCQueryHandler for a given query route path or nil
// if not found
func (qrt *GRPCRouter) Route(path string) GRPCQueryHandler {
func (qrt *GRPCQueryRouter) Route(path string) GRPCQueryHandler {
handler, found := qrt.routes[path]
if !found {
return nil
Expand All @@ -44,7 +44,7 @@ func (qrt *GRPCRouter) Route(path string) GRPCQueryHandler {

// RegisterService implements the gRPC Server.RegisterService method. sd is a gRPC
// service description, handler is an object which implements that gRPC service
func (qrt *GRPCRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) {
func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) {
// adds a top-level query handler based on the gRPC service name
for _, method := range sd.Methods {
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)
Expand Down
4 changes: 2 additions & 2 deletions baseapp/grpcrouter_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
// interfaces needed to register a query service server and create a query
// service client.
type QueryServiceTestHelper struct {
*GRPCRouter
*GRPCQueryRouter
ctx sdk.Context
}

// NewQueryServerTestHelper creates a new QueryServiceTestHelper that wraps
// the provided sdk.Context
func NewQueryServerTestHelper(ctx sdk.Context) *QueryServiceTestHelper {
return &QueryServiceTestHelper{GRPCRouter: NewGRPCRouter(), ctx: ctx}
return &QueryServiceTestHelper{GRPCQueryRouter: NewGRPCQueryRouter(), ctx: ctx}
}

// Invoke implements the grpc ClientConn.Invoke method
Expand Down
6 changes: 3 additions & 3 deletions baseapp/grpcrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ func (e testServer) SayHello(_ context.Context, request *testdata.SayHelloReques
var _ testdata.TestServiceServer = testServer{}

func TestGRPCRouter(t *testing.T) {
qr := NewGRPCRouter()
qr := NewGRPCQueryRouter()
testdata.RegisterTestServiceServer(qr, testServer{})
helper := &QueryServiceTestHelper{
GRPCRouter: qr,
ctx: sdk.Context{},
GRPCQueryRouter: qr,
ctx: sdk.Context{},
}
client := testdata.NewTestServiceClient(helper)

Expand Down