Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema): indexing API #20647

Merged
merged 41 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
76f6f32
feat: indexer base types
aaronc Jun 11, 2024
63aeb85
WIP on tests
aaronc Jun 11, 2024
28ed78b
feat(indexer/base): add Manager and DecodeableModule
aaronc Jun 11, 2024
21b787a
WIP
aaronc Jun 11, 2024
b2e57cd
update listener
aaronc Jun 12, 2024
2313851
docs
aaronc Jun 12, 2024
e2fcb60
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jun 17, 2024
6466ac5
updates
aaronc Jun 17, 2024
c87f020
updates
aaronc Jun 17, 2024
4c793d7
docs
aaronc Jun 18, 2024
2ba1d7c
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jun 18, 2024
1088827
feat(schema/appdata)!: refactoring and packet support
aaronc Jul 3, 2024
812c6ee
docs
aaronc Jul 3, 2024
77ec7e7
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/schem…
aaronc Jul 3, 2024
bca3bb0
feat(schema/appdata): async listener mux'ing
aaronc Jul 3, 2024
26734fb
tests
aaronc Jul 3, 2024
d5b55a9
testing WIP
aaronc Jul 3, 2024
ac04ca0
add module filter
aaronc Jul 4, 2024
edfd4c6
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/schem…
aaronc Jul 4, 2024
09b0c97
feat(schema): decoding middleware
aaronc Jul 4, 2024
1a719bd
Merge branch 'main' into aaronc/indexer-base-manager
aaronc Jul 4, 2024
da90cdc
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jul 4, 2024
7be101e
WIP
aaronc Jul 4, 2024
8b32520
Merge branch 'aaronc/schema-decoding' into aaronc/indexer-base-manager
aaronc Jul 4, 2024
d98e7f7
Merge branch 'aaronc/schema-appdata-utils' into aaronc/indexer-base-m…
aaronc Jul 4, 2024
9600152
WIP
aaronc Jul 4, 2024
3a68b64
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jul 5, 2024
23ff67f
add manager API
aaronc Jul 5, 2024
d6cf655
update config
aaronc Jul 5, 2024
b5ca35a
add README, refactor
aaronc Jul 5, 2024
595df7c
add baseapp and simapp integration
aaronc Jul 5, 2024
40e4437
docs updates
aaronc Jul 5, 2024
afce42b
docs
aaronc Jul 5, 2024
6c40979
update go.mod
aaronc Jul 5, 2024
86ec5bd
fixes
aaronc Jul 5, 2024
8ac5625
Merge branch 'main' into aaronc/indexer-base-manager
aaronc Jul 8, 2024
cbc8a95
lint fix
aaronc Jul 9, 2024
047ffb4
Merge remote-tracking branch 'origin/aaronc/indexer-base-manager' int…
aaronc Jul 9, 2024
7d42710
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jul 9, 2024
7fc0c03
go mod tidy
aaronc Jul 9, 2024
199d06b
simapp build fix
aaronc Jul 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package baseapp

import (
"context"
"fmt"
"sort"
"strings"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/spf13/cast"

"cosmossdk.io/schema"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/schema/decoding"
"cosmossdk.io/schema/indexer"
"cosmossdk.io/store/streaming"
storetypes "cosmossdk.io/store/types"

Expand All @@ -22,6 +28,31 @@ const (
StreamingABCIStopNodeOnErrTomlKey = "stop-node-on-err"
)

// EnableIndexer enables the built-in indexer with the provided options (usually from the app.toml indexer key),
// kv-store keys, and app modules. Using the built-in indexer framework is mutually exclusive from using other
// types of streaming listeners.
func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*storetypes.KVStoreKey, appModules map[string]any) error {
listener, err := indexer.StartManager(indexer.ManagerOptions{
Config: indexerOpts,
Resolver: decoding.ModuleSetDecoderResolver(appModules),
SyncSource: nil,
Logger: app.logger.With("module", "indexer"),
})
if err != nil {
return err
}

exposedKeys := exposeStoreKeysSorted([]string{"*"}, keys)
app.cms.AddListeners(exposedKeys)

app.streamingManager = storetypes.StreamingManager{
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener}},
StopNodeOnErr: true,
}

return nil
}

// RegisterStreamingServices registers streaming services with the BaseApp.
func (app *BaseApp) RegisterStreamingServices(appOpts servertypes.AppOptions, keys map[string]*storetypes.KVStoreKey) error {
// register streaming services
Expand Down Expand Up @@ -110,3 +141,51 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore

return exposeStoreKeys
}

type listenerWrapper struct {
listener appdata.Listener
}

func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
if p.listener.StartBlock != nil {
err := p.listener.StartBlock(appdata.StartBlockData{
Height: uint64(req.Height),
})
if err != nil {
return err
}
}

//// TODO txs, events

return nil
}

func (p listenerWrapper) ListenCommit(ctx context.Context, res abci.CommitResponse, changeSet []*storetypes.StoreKVPair) error {
if cb := p.listener.OnKVPair; cb != nil {
updates := make([]appdata.ModuleKVPairUpdate, len(changeSet))
for i, pair := range changeSet {
updates[i] = appdata.ModuleKVPairUpdate{
ModuleName: pair.StoreKey,
Update: schema.KVPairUpdate{
Key: pair.Key,
Value: pair.Value,
Delete: pair.Delete,
},
}
}
err := cb(appdata.KVPairData{Updates: updates})
if err != nil {
return err
}
}

if p.listener.Commit != nil {
err := p.listener.Commit(appdata.CommitData{})
if err != nil {
return err
}
}

return nil
}
6 changes: 5 additions & 1 deletion client/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ require (
pgregory.net/rapid v1.1.0 // indirect
)

require github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
require (
cosmossdk.io/schema v0.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
)

replace github.com/cosmos/cosmos-sdk => ./../../

Expand All @@ -181,6 +184,7 @@ replace (
cosmossdk.io/core/testing => ../../core/testing
cosmossdk.io/depinject => ./../../depinject
cosmossdk.io/log => ./../../log
cosmossdk.io/schema => ./../../schema
cosmossdk.io/store => ./../../store
cosmossdk.io/x/accounts => ./../../x/accounts
cosmossdk.io/x/auth => ./../../x/auth
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
cosmossdk.io/errors v1.0.1
cosmossdk.io/log v1.3.1
cosmossdk.io/math v1.3.0
cosmossdk.io/schema v0.0.0
cosmossdk.io/store v1.1.1-0.20240418092142-896cdf1971bc
cosmossdk.io/x/auth v0.0.0-00010101000000-000000000000
cosmossdk.io/x/bank v0.0.0-20240226161501-23359a0b6d91
Expand Down Expand Up @@ -189,6 +190,7 @@ replace (
cosmossdk.io/core/testing => ./core/testing
cosmossdk.io/depinject => ./depinject
cosmossdk.io/log => ./log
cosmossdk.io/schema => ./schema
cosmossdk.io/store => ./store
cosmossdk.io/x/accounts => ./x/accounts
cosmossdk.io/x/auth => ./x/auth
Expand Down
66 changes: 66 additions & 0 deletions schema/decoding/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package decoding

import (
"sort"

"cosmossdk.io/schema"
)

// DecoderResolver is an interface that allows indexers to discover and use module decoders.
type DecoderResolver interface {
// IterateAll iterates over all available module decoders.
IterateAll(func(moduleName string, cdc schema.ModuleCodec) error) error

// LookupDecoder looks up a specific module decoder.
LookupDecoder(moduleName string) (decoder schema.ModuleCodec, found bool, err error)
}

// ModuleSetDecoderResolver returns DecoderResolver that will discover modules implementing
// DecodeableModule in the provided module set.
func ModuleSetDecoderResolver(moduleSet map[string]interface{}) DecoderResolver {
return &moduleSetDecoderResolver{
moduleSet: moduleSet,
}
}

type moduleSetDecoderResolver struct {
moduleSet map[string]interface{}
}

func (a moduleSetDecoderResolver) IterateAll(f func(string, schema.ModuleCodec) error) error {
keys := make([]string, 0, len(a.moduleSet))
for k := range a.moduleSet {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
module := a.moduleSet[k]
dm, ok := module.(schema.HasModuleCodec)
if ok {
decoder, err := dm.ModuleCodec()
if err != nil {
return err
}
err = f(k, decoder)
if err != nil {
return err
}
}
}
return nil
}

func (a moduleSetDecoderResolver) LookupDecoder(moduleName string) (schema.ModuleCodec, bool, error) {
mod, ok := a.moduleSet[moduleName]
if !ok {
return schema.ModuleCodec{}, false, nil
}

dm, ok := mod.(schema.HasModuleCodec)
if !ok {
return schema.ModuleCodec{}, false, nil
}

decoder, err := dm.ModuleCodec()
return decoder, true, err
}
124 changes: 124 additions & 0 deletions schema/decoding/resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package decoding

import (
"fmt"
"testing"

"cosmossdk.io/schema"
)

type modA struct{}

func (m modA) ModuleCodec() (schema.ModuleCodec, error) {
return schema.ModuleCodec{
Schema: schema.ModuleSchema{ObjectTypes: []schema.ObjectType{{Name: "A"}}},
}, nil
}

type modB struct{}

func (m modB) ModuleCodec() (schema.ModuleCodec, error) {
return schema.ModuleCodec{
Schema: schema.ModuleSchema{ObjectTypes: []schema.ObjectType{{Name: "B"}}},
}, nil
}

type modC struct{}

var moduleSet = map[string]interface{}{
"modA": modA{},
"modB": modB{},
"modC": modC{},
}

var resolver = ModuleSetDecoderResolver(moduleSet)

func TestModuleSetDecoderResolver_IterateAll(t *testing.T) {
objectTypes := map[string]bool{}
err := resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
objectTypes[cdc.Schema.ObjectTypes[0].Name] = true
return nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if len(objectTypes) != 2 {
t.Fatalf("expected 2 object types, got %d", len(objectTypes))
}

if !objectTypes["A"] {
t.Fatalf("expected object type A")
}

if !objectTypes["B"] {
t.Fatalf("expected object type B")
}
}

func TestModuleSetDecoderResolver_LookupDecoder(t *testing.T) {
decoder, found, err := resolver.LookupDecoder("modA")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !found {
t.Fatalf("expected to find decoder for modA")
}

if decoder.Schema.ObjectTypes[0].Name != "A" {
t.Fatalf("expected object type A, got %s", decoder.Schema.ObjectTypes[0].Name)
}

decoder, found, err = resolver.LookupDecoder("modB")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !found {
t.Fatalf("expected to find decoder for modB")
}

if decoder.Schema.ObjectTypes[0].Name != "B" {
t.Fatalf("expected object type B, got %s", decoder.Schema.ObjectTypes[0].Name)
}

decoder, found, err = resolver.LookupDecoder("modC")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if found {
t.Fatalf("expected not to find decoder")
}

decoder, found, err = resolver.LookupDecoder("modD")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if found {
t.Fatalf("expected not to find decoder")
}
}

type modD struct{}

func (m modD) ModuleCodec() (schema.ModuleCodec, error) {
return schema.ModuleCodec{}, fmt.Errorf("an error")
}

func TestModuleSetDecoderResolver_IterateAll_Error(t *testing.T) {
resolver := ModuleSetDecoderResolver(map[string]interface{}{
"modD": modD{},
})
err := resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
if moduleName == "modD" {
t.Fatalf("expected error")
}
return nil
})
if err == nil {
t.Fatalf("expected error")
}
}
8 changes: 8 additions & 0 deletions schema/decoding/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package decoding

// SyncSource is an interface that allows indexers to start indexing modules with pre-existing state.
// It should generally be a wrapper around the key-value store.
type SyncSource interface {
// IterateAllKVPairs iterates over all key-value pairs for a given module.
IterateAllKVPairs(moduleName string, fn func(key, value []byte) error) error
}
15 changes: 15 additions & 0 deletions schema/indexer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Indexer Framework

# Defining an Indexer

Indexer implementations should be registered with the `indexer.Register` function with a unique type name. Indexers take the configuration options defined by `indexer.Config` which defines a common set of configuration options as well as indexer-specific options under the `config` sub-key. Indexers do not need to manage the common filtering options specified in `Config` - the indexer manager will manage these for the indexer. Indexer implementations just need to return a correct `InitResult` response.

# Integrating the Indexer Manager

The indexer manager should be used for managing all indexers and should be integrated directly with applications wishing to support indexing. The `StartManager` function is used to start the manager. The configuration options for the manager and all indexer targets should be passed as the ManagerOptions.Config field and should match the json structure of ManagerConfig. An example configuration section in `app.toml` might look like this:

```toml
[indexer.target.postgres]
type = "postgres"
config.database_url = "postgres://user:password@localhost:5432/dbname"
```
Loading
Loading