Skip to content

Commit

Permalink
rpc: index block events to support block event queries (bp #6226) (#6…
Browse files Browse the repository at this point in the history
…261)
  • Loading branch information
tnasu committed Dec 14, 2021
1 parent 3a768b1 commit ae15fa1
Show file tree
Hide file tree
Showing 28 changed files with 2,269 additions and 185 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.15
require (
github.com/BurntSushi/toml v0.3.1
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d
github.com/Workiva/go-datastructures v1.0.52
github.com/Workiva/go-datastructures v1.0.2
github.com/btcsuite/btcd v0.21.0-beta
github.com/btcsuite/btcutil v1.0.2
github.com/confio/ics23/go v0.6.3
Expand All @@ -15,6 +15,7 @@ require (
github.com/go-logfmt/logfmt v0.5.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.4.3
github.com/google/orderedcode v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/gtank/merlin v0.1.1
github.com/herumi/bls-eth-go-binary v0.0.0-20200923072303-32b29e5d8cbf
Expand Down
931 changes: 931 additions & 0 deletions go.sum

Large diffs are not rendered by default.

39 changes: 35 additions & 4 deletions light/proxy/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
"commit": rpcserver.NewRPCFunc(makeCommitFunc(c), "height"),
"tx": rpcserver.NewRPCFunc(makeTxFunc(c), "hash,prove"),
"tx_search": rpcserver.NewRPCFunc(makeTxSearchFunc(c), "query,prove,page,per_page,order_by"),
"block_search": rpcserver.NewRPCFunc(makeBlockSearchFunc(c), "query,page,per_page,order_by"),
"validators": rpcserver.NewRPCFunc(makeValidatorsFunc(c), "height,page,per_page"),
"voters": rpcserver.NewRPCFunc(makeVotersFunc(c), "height,page,per_page"),
"dump_consensus_state": rpcserver.NewRPCFunc(makeDumpConsensusStateFunc(c), ""),
Expand Down Expand Up @@ -132,16 +133,46 @@ func makeTxFunc(c *lrpc.Client) rpcTxFunc {
}
}

type rpcTxSearchFunc func(ctx *rpctypes.Context, query string, prove bool,
page, perPage *int, orderBy string) (*ctypes.ResultTxSearch, error)
type rpcTxSearchFunc func(
ctx *rpctypes.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*ctypes.ResultTxSearch, error)

func makeTxSearchFunc(c *lrpc.Client) rpcTxSearchFunc {
return func(ctx *rpctypes.Context, query string, prove bool, page, perPage *int, orderBy string) (
*ctypes.ResultTxSearch, error) {
return func(
ctx *rpctypes.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*ctypes.ResultTxSearch, error) {
return c.TxSearch(ctx.Context(), query, prove, page, perPage, orderBy)
}
}

type rpcBlockSearchFunc func(
ctx *rpctypes.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error)

func makeBlockSearchFunc(c *lrpc.Client) rpcBlockSearchFunc {
return func(
ctx *rpctypes.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error) {
return c.BlockSearch(ctx.Context(), query, page, perPage, orderBy)
}
}

type rpcValidatorsFunc func(ctx *rpctypes.Context, height *int64,
page, perPage *int) (*ctypes.ResultValidators, error)

Expand Down
33 changes: 25 additions & 8 deletions light/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ type LightClient interface {
TrustedLightBlock(height int64) (*types.LightBlock, error)
}

var _ rpcclient.Client = (*Client)(nil)

// Client is an RPC client, which uses light#Client to verify data (if it can
// be proved!). merkle.DefaultProofRuntime is used to verify values returned by
// ABCIQuery.
// be proved). Note, merkle.DefaultProofRuntime is used to verify values
// returned by ABCI#Query.
type Client struct {
service.BaseService

next rpcclient.Client
lc LightClient
// Proof runtime used to verify values returned by ABCIQuery

// proof runtime used to verify values returned by ABCIQuery
prt *merkle.ProofRuntime
keyPathFn KeyPathFunc
}
Expand Down Expand Up @@ -448,11 +451,25 @@ func (c *Client) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.Resul
return res, res.Proof.Validate(l.DataHash)
}

func (c *Client) TxSearch(ctx context.Context, query string, prove bool, page, perPage *int, orderBy string) (
*ctypes.ResultTxSearch, error) {
func (c *Client) TxSearch(
ctx context.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*ctypes.ResultTxSearch, error) {
return c.next.TxSearch(ctx, query, prove, page, perPage, orderBy)
}

func (c *Client) BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error) {
return c.next.BlockSearch(ctx, query, page, perPage, orderBy)
}

// Validators fetches and verifies validators.
//
// WARNING: only full validator sets are verified (when length of validators is
Expand All @@ -462,8 +479,9 @@ func (c *Client) Validators(
height *int64,
pagePtr, perPagePtr *int,
) (*ctypes.ResultValidators, error) {
// Update the light client if we're behind and retrieve the light block at the requested height
// or at the latest height if no height is provided.

// Update the light client if we're behind and retrieve the light block at the
// requested height or at the latest height if no height is provided.
l, err := c.updateLightClientIfNeededTo(ctx, height)
if err != nil {
return nil, err
Expand All @@ -477,7 +495,6 @@ func (c *Client) Validators(
}

skipCount := validateSkipCount(page, perPage)

v := l.ValidatorSet.Validators[skipCount : skipCount+tmmath.MinInt(perPage, totalCount-skipCount)]

return &ctypes.ResultValidators{
Expand Down
47 changes: 33 additions & 14 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rs/cors"

dbm "github.com/line/tm-db/v2"
pdbm "github.com/line/tm-db/v2/prefixdb"

abci "github.com/line/ostracon/abci/types"
bcv0 "github.com/line/ostracon/blockchain/v0"
Expand All @@ -40,6 +41,9 @@ import (
grpccore "github.com/line/ostracon/rpc/grpc"
rpcserver "github.com/line/ostracon/rpc/jsonrpc/server"
sm "github.com/line/ostracon/state"
"github.com/line/ostracon/state/indexer"
blockidxkv "github.com/line/ostracon/state/indexer/block/kv"
blockidxnull "github.com/line/ostracon/state/indexer/block/null"
"github.com/line/ostracon/state/txindex"
"github.com/line/ostracon/state/txindex/kv"
"github.com/line/ostracon/state/txindex/null"
Expand Down Expand Up @@ -133,7 +137,7 @@ func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
type Option func(*Node)

// Temporary interface for switching to fast sync, we should get rid of v0 and v1 reactors.
// See: https://github.com/tendermint/tendermint/issues/4595
// See: https://github.com/line/ostracon/issues/4595
type fastSyncReactor interface {
SwitchToFastSync(sm.State) error
}
Expand Down Expand Up @@ -210,6 +214,7 @@ type Node struct {
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
blockIndexer indexer.BlockIndexer
indexerService *txindex.IndexerService
prometheusSrv *http.Server
}
Expand Down Expand Up @@ -248,27 +253,40 @@ func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
return eventBus, nil
}

func createAndStartIndexerService(config *cfg.Config, dbProvider DBProvider,
eventBus *types.EventBus, logger log.Logger) (*txindex.IndexerService, txindex.TxIndexer, error) {
func createAndStartIndexerService(
config *cfg.Config,
dbProvider DBProvider,
eventBus *types.EventBus,
logger log.Logger,
) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) {

var (
txIndexer txindex.TxIndexer
blockIndexer indexer.BlockIndexer
)

var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

txIndexer = kv.NewTxIndex(store)
blockIndexer = blockidxkv.New(pdbm.NewDB(store, []byte("block_events")))
default:
txIndexer = &null.TxIndex{}
blockIndexer = &blockidxnull.BlockerIndexer{}
}

indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))

if err := indexerService.Start(); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
return indexerService, txIndexer, nil

return indexerService, txIndexer, blockIndexer, nil
}

func doHandshake(
Expand Down Expand Up @@ -559,7 +577,7 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
// https://github.com/line/ostracon/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
RecvBufSize: config.P2P.PexRecvBufSize,
Expand Down Expand Up @@ -670,8 +688,7 @@ func NewNode(config *cfg.Config,
return nil, err
}

// Transaction indexing
indexerService, txIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger)
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -763,7 +780,7 @@ func NewNode(config *cfg.Config,
// Set up state sync reactor, and schedule a sync if requested.
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
// https://github.com/line/ostracon/issues/4644
stateSyncReactor := statesync.NewReactor(proxyApp.Snapshot(), proxyApp.Query(),
config.P2P.RecvAsync, config.P2P.StatesyncRecvBufSize)
stateSyncReactor.SetLogger(logger.With("module", "statesync"))
Expand Down Expand Up @@ -848,6 +865,7 @@ func NewNode(config *cfg.Config,
proxyApp: proxyApp,
txIndexer: txIndexer,
indexerService: indexerService,
blockIndexer: blockIndexer,
eventBus: eventBus,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)
Expand Down Expand Up @@ -1004,6 +1022,7 @@ func (n *Node) ConfigureRPC() error {
PubKey: pubKey,
GenDoc: n.genesisDoc,
TxIndexer: n.txIndexer,
BlockIndexer: n.blockIndexer,
ConsensusReactor: n.consensusReactor,
EventBus: n.eventBus,
Mempool: n.mempool,
Expand Down Expand Up @@ -1036,7 +1055,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
config.IdleTimeout = n.config.RPC.IdleTimeout
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
// See https://github.com/line/ostracon/issues/3435
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
Expand Down Expand Up @@ -1115,7 +1134,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
config.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
// See https://github.com/line/ostracon/issues/3435
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
Expand Down
35 changes: 33 additions & 2 deletions rpc/client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,24 +468,55 @@ func (c *baseRPCClient) TxSearch(
page,
perPage *int,
orderBy string,
) (
*ctypes.ResultTxSearch, error) {
) (*ctypes.ResultTxSearch, error) {

result := new(ctypes.ResultTxSearch)
params := map[string]interface{}{
"query": query,
"prove": prove,
"order_by": orderBy,
}

if page != nil {
params["page"] = page
}
if perPage != nil {
params["per_page"] = perPage
}

_, err := c.caller.Call(ctx, "tx_search", params, result)
if err != nil {
return nil, err
}

return result, nil
}

func (c *baseRPCClient) BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error) {

result := new(ctypes.ResultBlockSearch)
params := map[string]interface{}{
"query": query,
"order_by": orderBy,
}

if page != nil {
params["page"] = page
}
if perPage != nil {
params["per_page"] = perPage
}

_, err := c.caller.Call(ctx, "block_search", params, result)
if err != nil {
return nil, err
}

return result, nil
}

Expand Down
21 changes: 19 additions & 2 deletions rpc/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,25 @@ type SignClient interface {
Validators(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error)
Voters(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultVoters, error)
Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error)
TxSearch(ctx context.Context, query string, prove bool, page, perPage *int,
orderBy string) (*ctypes.ResultTxSearch, error)

// TxSearch defines a method to search for a paginated set of transactions by
// DeliverTx event search criteria.
TxSearch(
ctx context.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*ctypes.ResultTxSearch, error)

// BlockSearch defines a method to search for a paginated set of blocks by
// BeginBlock and EndBlock event search criteria.
BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error)
}

// HistoryClient provides access to data from genesis to now in large chunks.
Expand Down
Loading

0 comments on commit ae15fa1

Please sign in to comment.