Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema): indexing API #20647

Merged
merged 41 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
76f6f32
feat: indexer base types
aaronc Jun 11, 2024
63aeb85
WIP on tests
aaronc Jun 11, 2024
28ed78b
feat(indexer/base): add Manager and DecodeableModule
aaronc Jun 11, 2024
21b787a
WIP
aaronc Jun 11, 2024
b2e57cd
update listener
aaronc Jun 12, 2024
2313851
docs
aaronc Jun 12, 2024
e2fcb60
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jun 17, 2024
6466ac5
updates
aaronc Jun 17, 2024
c87f020
updates
aaronc Jun 17, 2024
4c793d7
docs
aaronc Jun 18, 2024
2ba1d7c
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jun 18, 2024
1088827
feat(schema/appdata)!: refactoring and packet support
aaronc Jul 3, 2024
812c6ee
docs
aaronc Jul 3, 2024
77ec7e7
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/schem…
aaronc Jul 3, 2024
bca3bb0
feat(schema/appdata): async listener mux'ing
aaronc Jul 3, 2024
26734fb
tests
aaronc Jul 3, 2024
d5b55a9
testing WIP
aaronc Jul 3, 2024
ac04ca0
add module filter
aaronc Jul 4, 2024
edfd4c6
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/schem…
aaronc Jul 4, 2024
09b0c97
feat(schema): decoding middleware
aaronc Jul 4, 2024
1a719bd
Merge branch 'main' into aaronc/indexer-base-manager
aaronc Jul 4, 2024
da90cdc
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jul 4, 2024
7be101e
WIP
aaronc Jul 4, 2024
8b32520
Merge branch 'aaronc/schema-decoding' into aaronc/indexer-base-manager
aaronc Jul 4, 2024
d98e7f7
Merge branch 'aaronc/schema-appdata-utils' into aaronc/indexer-base-m…
aaronc Jul 4, 2024
9600152
WIP
aaronc Jul 4, 2024
3a68b64
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jul 5, 2024
23ff67f
add manager API
aaronc Jul 5, 2024
d6cf655
update config
aaronc Jul 5, 2024
b5ca35a
add README, refactor
aaronc Jul 5, 2024
595df7c
add baseapp and simapp integration
aaronc Jul 5, 2024
40e4437
docs updates
aaronc Jul 5, 2024
afce42b
docs
aaronc Jul 5, 2024
6c40979
update go.mod
aaronc Jul 5, 2024
86ec5bd
fixes
aaronc Jul 5, 2024
8ac5625
Merge branch 'main' into aaronc/indexer-base-manager
aaronc Jul 8, 2024
cbc8a95
lint fix
aaronc Jul 9, 2024
047ffb4
Merge remote-tracking branch 'origin/aaronc/indexer-base-manager' int…
aaronc Jul 9, 2024
7d42710
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jul 9, 2024
7fc0c03
go mod tidy
aaronc Jul 9, 2024
199d06b
simapp build fix
aaronc Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions indexer/base/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ The basic types for specifying index sources, targets and decoders are provided
```mermaid
sequenceDiagram
actor Source
participant Indexer
Source ->> Indexer: Initialize
participant Indexer
Source -->> Indexer: InitializeModuleSchema
Source ->> Indexer: Initialize
loop Block
Source ->> Indexer: StartBlock
Source ->> Indexer: OnBlockHeader
Expand All @@ -25,7 +25,7 @@ sequenceDiagram
end
```

`Initialize` must be called before any other method and should only be invoked once. `InitializeModuleSchema` should be called at most once for every module with logical data.
`InitializeModuleSchema` should be called at most once for every module with logical data and all calls to should happen even before `Initialize` is called. After that `Initialize` MUST be called before any other method and should only be invoked once.

Sources will generally only call `InitializeModuleSchema` and `OnObjectUpdate` if they have native logical decoding capabilities. Usually, the indexer framework will provide this functionality based on `OnKVPair` data and `IndexableModule` implementations.

Expand Down
70 changes: 68 additions & 2 deletions indexer/base/decoder.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package indexerbase

import "sort"

// DecodableModule is an interface that modules can implement to provide a ModuleDecoder.
// Usually these modules would also implement appmodule.AppModule, but that is not included
// to keep this package free of any dependencies.
type DecodableModule interface {

// ModuleDecoder returns a ModuleDecoder for the module.
ModuleDecoder() (ModuleDecoder, error)
}
Expand All @@ -23,4 +25,68 @@ type ModuleDecoder struct {
// If the KV-pair doesn't represent an object update, the function should return false
// as the second return value. Error should only be non-nil when the decoder expected
// to parse a valid update and was unable to.
type KVDecoder = func(key, value []byte) (ObjectUpdate, bool, error)
type KVDecoder = func(key, value []byte, deleted bool) (ObjectUpdate, bool, error)

type DecoderResolver interface {
// Iterate iterates over all module decoders which should be initialized at startup.
Iterate(func(string, ModuleDecoder) error) error

// LookupDecoder allows for resolving decoders dynamically. For instance, some module-like
// things may come into existence dynamically (like x/accounts or EVM or WASM contracts).
// The first time the manager sees one of these appearing in KV-store writes, it will
// lookup a decoder for it and cache it for future use. The manager will also perform
// a catch-up sync before passing any new writes to ensure that all historical state has
// been synced if there is any This check will only happen the first time a module is seen
// by the manager in a given process (a process restart will cause this check to happen again).
LookupDecoder(moduleName string) (decoder ModuleDecoder, found bool, err error)
}

type moduleSetDecoderResolver struct {
moduleSet map[string]interface{}
}

// ModuleSetDecoderResolver returns DecoderResolver that will discover modules implementing
// DecodeableModule in the provided module set.
func ModuleSetDecoderResolver(moduleSet map[string]interface{}) DecoderResolver {
return &moduleSetDecoderResolver{
moduleSet: moduleSet,
}
}

func (a moduleSetDecoderResolver) Iterate(f func(string, ModuleDecoder) error) error {
keys := make([]string, 0, len(a.moduleSet))
for k := range a.moduleSet {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
module := a.moduleSet[k]
dm, ok := any(module).(DecodableModule)
if ok {
decoder, err := dm.ModuleDecoder()
if err != nil {
return err
}
err = f(k, decoder)
if err != nil {
return err
}
}
}
return nil
}

func (a moduleSetDecoderResolver) LookupDecoder(moduleName string) (ModuleDecoder, bool, error) {
mod, ok := a.moduleSet[moduleName]
if !ok {
return ModuleDecoder{}, false, nil
}

dm, ok := any(mod).(DecodableModule)
if !ok {
return ModuleDecoder{}, false, nil
}

decoder, err := dm.ModuleDecoder()
return decoder, true, err
}
31 changes: 27 additions & 4 deletions indexer/base/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ type Listener struct {
// The lastBlockPersisted return value should be the last block height the listener persisted if it is
// persisting block data, 0 if it is not interested in persisting block data, or -1 if it is
// persisting block data but has not persisted any data yet. This check allows the indexer
// framework to ensure that the listener has not missed blocks.
// framework to ensure that the listener has not missed blocks. Data sources MUST call
// initialize before any other method is called, otherwise, no data will be processed.
Initialize func(InitializationData) (lastBlockPersisted int64, err error)

// StartBlock is called at the beginning of processing a block.
Expand All @@ -31,11 +32,12 @@ type Listener struct {
OnEvent func(EventData) error

// OnKVPair is called when a key-value has been written to the store for a given module.
OnKVPair func(moduleName string, key, value []byte, delete bool) error
OnKVPair func(KVPairData) error

// Commit is called when state is committed, usually at the end of a block. Any
// indexers should commit their data when this is called and return an error if
// they are unable to commit.
// they are unable to commit. Data sources MUST call Commit when data is committed,
// otherwise it should be assumed that indexers have not persisted their state.
Commit func() error

// InitializeModuleSchema should be called whenever the blockchain process starts OR whenever
Expand All @@ -49,7 +51,7 @@ type Listener struct {
// OnObjectUpdate is called whenever an object is updated in a module's state. This is only called
// when logical data is available. It should be assumed that the same data in raw form
// is also passed to OnKVPair.
OnObjectUpdate func(module string, update ObjectUpdate) error
OnObjectUpdate func(ObjectUpdateData) error
}

// InitializationData represents initialization data that is passed to a listener.
Expand Down Expand Up @@ -117,3 +119,24 @@ type ToBytes = func() ([]byte, error)

// ToJSON is a function that lazily returns the JSON representation of data.
type ToJSON = func() (json.RawMessage, error)

// KVPairData represents key-value pair data that is passed to a listener.
type KVPairData struct {
// ModuleName is the name of the module that the key-value pair belongs to.
ModuleName string

// Key is the key of the key-value pair.
Key []byte

// Value is the value of the key-value pair. It should be ignored when Delete is true.
Value []byte

// Delete is a flag that indicates that the key-value pair was deleted. If it is false,
// then it is assumed that this has been a set operation.
Delete bool
}

type ObjectUpdateData struct {
ModuleName string
Update ObjectUpdate
}
20 changes: 20 additions & 0 deletions indexer/base/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package indexerbase

// Logger is the expected interface of loggers used in the indexer.
type Logger interface {
// Info takes a message and a set of key/value pairs and logs with level INFO.
// The key of the tuple must be a string.
Info(msg string, keyVals ...any)

// Warn takes a message and a set of key/value pairs and logs with level WARN.
// The key of the tuple must be a string.
Warn(msg string, keyVals ...any)

// Error takes a message and a set of key/value pairs and logs with level ERR.
// The key of the tuple must be a string.
Error(msg string, keyVals ...any)

// Debug takes a message and a set of key/value pairs and logs with level DEBUG.
// The key of the tuple must be a string.
Debug(msg string, keyVals ...any)
}
Loading
Loading