Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema): indexing API #20647

Merged
merged 41 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
76f6f32
feat: indexer base types
aaronc Jun 11, 2024
63aeb85
WIP on tests
aaronc Jun 11, 2024
28ed78b
feat(indexer/base): add Manager and DecodeableModule
aaronc Jun 11, 2024
21b787a
WIP
aaronc Jun 11, 2024
b2e57cd
update listener
aaronc Jun 12, 2024
2313851
docs
aaronc Jun 12, 2024
e2fcb60
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jun 17, 2024
6466ac5
updates
aaronc Jun 17, 2024
c87f020
updates
aaronc Jun 17, 2024
4c793d7
docs
aaronc Jun 18, 2024
2ba1d7c
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jun 18, 2024
1088827
feat(schema/appdata)!: refactoring and packet support
aaronc Jul 3, 2024
812c6ee
docs
aaronc Jul 3, 2024
77ec7e7
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/schem…
aaronc Jul 3, 2024
bca3bb0
feat(schema/appdata): async listener mux'ing
aaronc Jul 3, 2024
26734fb
tests
aaronc Jul 3, 2024
d5b55a9
testing WIP
aaronc Jul 3, 2024
ac04ca0
add module filter
aaronc Jul 4, 2024
edfd4c6
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/schem…
aaronc Jul 4, 2024
09b0c97
feat(schema): decoding middleware
aaronc Jul 4, 2024
1a719bd
Merge branch 'main' into aaronc/indexer-base-manager
aaronc Jul 4, 2024
da90cdc
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jul 4, 2024
7be101e
WIP
aaronc Jul 4, 2024
8b32520
Merge branch 'aaronc/schema-decoding' into aaronc/indexer-base-manager
aaronc Jul 4, 2024
d98e7f7
Merge branch 'aaronc/schema-appdata-utils' into aaronc/indexer-base-m…
aaronc Jul 4, 2024
9600152
WIP
aaronc Jul 4, 2024
3a68b64
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jul 5, 2024
23ff67f
add manager API
aaronc Jul 5, 2024
d6cf655
update config
aaronc Jul 5, 2024
b5ca35a
add README, refactor
aaronc Jul 5, 2024
595df7c
add baseapp and simapp integration
aaronc Jul 5, 2024
40e4437
docs updates
aaronc Jul 5, 2024
afce42b
docs
aaronc Jul 5, 2024
6c40979
update go.mod
aaronc Jul 5, 2024
86ec5bd
fixes
aaronc Jul 5, 2024
8ac5625
Merge branch 'main' into aaronc/indexer-base-manager
aaronc Jul 8, 2024
cbc8a95
lint fix
aaronc Jul 9, 2024
047ffb4
Merge remote-tracking branch 'origin/aaronc/indexer-base-manager' int…
aaronc Jul 9, 2024
7d42710
Merge branch 'main' of github.com:cosmos/cosmos-sdk into aaronc/index…
aaronc Jul 9, 2024
7fc0c03
go mod tidy
aaronc Jul 9, 2024
199d06b
simapp build fix
aaronc Jul 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions indexer/base/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Indexer Base

The indexer base module is designed to provide a stable, zero-dependency base layer for the built-in indexer functionality. Packages that integrate with the indexer should feel free to depend on this package without fear of any external dependencies being pulled in.

The basic types for specifying index sources, targets and decoders are provided here along with a basic engine that ties these together. A package wishing to be an indexing source could accept an instance of `Engine` directly to be compatible with indexing. A package wishing to be a decoder can use the `Entity` and `Table` types. A package defining an indexing target should implement the `Indexer` interface.
8 changes: 8 additions & 0 deletions indexer/base/catch_up.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package indexerbase

// CatchUpSource is an interface that allows indexers to start indexing modules with pre-existing state.
type CatchUpSource interface {

// IterateAllKVPairs iterates over all key-value pairs for a given module.
IterateAllKVPairs(moduleName string, fn func(key, value []byte) error) error
}
151 changes: 151 additions & 0 deletions indexer/base/column.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package indexerbase

import "fmt"

// Column represents a column in a table schema.
type Column struct {
// Name is the name of the column.
Name string

// Kind is the basic type of the column.
Kind Kind

// Nullable indicates whether null values are accepted for the column.
Nullable bool

// AddressPrefix is the address prefix of the column's kind, currently only used for Bech32AddressKind.
AddressPrefix string

// EnumDefinition is the definition of the enum type and is only valid when Kind is EnumKind.
EnumDefinition EnumDefinition
}

// EnumDefinition represents the definition of an enum type.
type EnumDefinition struct {
// Name is the name of the enum type.
Name string

// Values is a list of distinct values that are part of the enum type.
Values []string
}

// Validate validates the column.
func (c Column) Validate() error {
// non-empty name
if c.Name == "" {
return fmt.Errorf("column name cannot be empty")
}

// valid kind
if err := c.Kind.Validate(); err != nil {
return fmt.Errorf("invalid column type for %q: %w", c.Name, err)
}

// address prefix only valid with Bech32AddressKind
if c.Kind == Bech32AddressKind && c.AddressPrefix == "" {
Fixed Show fixed Hide fixed
return fmt.Errorf("missing address prefix for column %q", c.Name)
} else if c.Kind != Bech32AddressKind && c.AddressPrefix != "" {
Fixed Show fixed Hide fixed
return fmt.Errorf("address prefix is only valid for column %q with type Bech32AddressKind", c.Name)
}

// enum definition only valid with EnumKind
if c.Kind == EnumKind {
if err := c.EnumDefinition.Validate(); err != nil {
return fmt.Errorf("invalid enum definition for column %q: %w", c.Name, err)
}
} else if c.Kind != EnumKind && c.EnumDefinition.Name != "" && c.EnumDefinition.Values != nil {
return fmt.Errorf("enum definition is only valid for column %q with type EnumKind", c.Name)
}

return nil
}

// Validate validates the enum definition.
func (e EnumDefinition) Validate() error {
if e.Name == "" {
return fmt.Errorf("enum definition name cannot be empty")
}
if len(e.Values) == 0 {
return fmt.Errorf("enum definition values cannot be empty")
}
seen := make(map[string]bool, len(e.Values))
for i, v := range e.Values {
if v == "" {
return fmt.Errorf("enum definition value at index %d cannot be empty for enum %s", i, e.Name)
}
if seen[v] {
return fmt.Errorf("duplicate enum definition value %q for enum %s", v, e.Name)
}
seen[v] = true
}
return nil
}

// ValidateValue validates that the value conforms to the column's kind and nullability.
// It currently does not do any validation that IntegerKind, DecimalKind, Bech32AddressKind, or EnumKind
// values are valid for their respective types behind conforming to the correct go type.
func (c Column) ValidateValue(value any) error {
if value == nil {
if !c.Nullable {
return fmt.Errorf("column %q cannot be null", c.Name)
}
return nil
}
return c.Kind.ValidateValueType(value)
}

// ValidateKey validates that the value conforms to the set of columns as a Key in an EntityUpdate.
// See EntityUpdate.Key for documentation on the requirements of such values.
func ValidateKey(cols []Column, value any) error {
if len(cols) == 0 {
return nil
}

if len(cols) == 1 {
return cols[0].ValidateValue(value)
}

values, ok := value.([]any)
if !ok {
return fmt.Errorf("expected slice of values for key columns, got %T", value)
}

if len(cols) != len(values) {
return fmt.Errorf("expected %d key columns, got %d values", len(cols), len(value.([]any)))
}
for i, col := range cols {
if err := col.ValidateValue(values[i]); err != nil {
return fmt.Errorf("invalid value for key column %q: %w", col.Name, err)
}
}
return nil
}

// ValidateValue validates that the value conforms to the set of columns as a Value in an EntityUpdate.
// See EntityUpdate.Value for documentation on the requirements of such values.
func ValidateValue(cols []Column, value any) error {
valueUpdates, ok := value.(ValueUpdates)
if ok {
colMap := map[string]Column{}
for _, col := range cols {
colMap[col.Name] = col
}
var errs []error
valueUpdates.Iterate(func(colName string, value any) bool {
col, ok := colMap[colName]
if !ok {
errs = append(errs, fmt.Errorf("unknown column %q in value updates", colName))
}
if err := col.ValidateValue(value); err != nil {
errs = append(errs, fmt.Errorf("invalid value for column %q: %w", colName, err))
}
return true
})
if len(errs) > 0 {
return fmt.Errorf("validation errors: %v", errs)
}
return nil
} else {
return ValidateKey(cols, value)
}
}
7 changes: 7 additions & 0 deletions indexer/base/column_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package indexerbase

import "testing"

func TestColumnValidate(t *testing.T) {

}
93 changes: 93 additions & 0 deletions indexer/base/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package indexerbase

import "sort"

type DecoderResolver interface {
// Iterate iterates over all module decoders which should be initialized at startup.
Iterate(func(string, ModuleDecoder) error) error

// LookupDecoder allows for resolving decoders dynamically. For instance, some module-like
// things may come into existence dynamically (like x/accounts or EVM or WASM contracts).
// The first time the manager sees one of these appearing in KV-store writes, it will
// lookup a decoder for it and cache it for future use. The manager will also perform
// a catch-up sync before passing any new writes to ensure that all historical state has
// been synced if there is any This check will only happen the first time a module is seen
// by the manager in a given process (a process restart will cause this check to happen again).
LookupDecoder(moduleName string) (decoder ModuleDecoder, found bool, err error)
}

// DecodableModule is an interface that modules can implement to provide a ModuleDecoder.
// Usually these modules would also implement appmodule.AppModule, but that is not included
// to leave this package free of any dependencies.
type DecodableModule interface {

// ModuleDecoder returns a ModuleDecoder for the module.
ModuleDecoder() (ModuleDecoder, error)
}

// ModuleDecoder is a struct that contains the schema and a KVDecoder for a module.
type ModuleDecoder struct {
// Schema is the schema for the module.
Schema ModuleSchema

// KVDecoder is a function that decodes a key-value pair into an EntityUpdate.
// If modules pass logical updates directly to the engine and don't require logical decoding of raw bytes,
// then this function should be nil.
KVDecoder KVDecoder
}

// KVDecoder is a function that decodes a key-value pair into an EntityUpdate.
// If the KV-pair doesn't represent an entity update, the function should return false
// as the second return value. Error should only be non-nil when the decoder expected
// to parse a valid update and was unable to.
type KVDecoder = func(key, value []byte) (EntityUpdate, bool, error)

type appModuleDecoderResolver[ModuleT any] struct {
moduleSet map[string]ModuleT
}

// NewAppModuleDecoderResolver returns DecoderResolver that will discover modules implementing
// DecodeableModule in the provided module set.
func NewAppModuleDecoderResolver[ModuleT any](moduleSet map[string]ModuleT) DecoderResolver {
return &appModuleDecoderResolver[ModuleT]{
moduleSet: moduleSet,
}
}

func (a appModuleDecoderResolver[ModuleT]) Iterate(f func(string, ModuleDecoder) error) error {
keys := make([]string, 0, len(a.moduleSet))
for k := range a.moduleSet {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
module := a.moduleSet[k]
dm, ok := any(module).(DecodableModule)
if ok {
decoder, err := dm.ModuleDecoder()
if err != nil {
return err
}
err = f(k, decoder)
if err != nil {
return err
}
}
}
return nil
}

func (a appModuleDecoderResolver[ModuleT]) LookupDecoder(moduleName string) (ModuleDecoder, bool, error) {
mod, ok := a.moduleSet[moduleName]
if !ok {
return ModuleDecoder{}, false, nil
}

dm, ok := any(mod).(DecodableModule)
if !ok {
return ModuleDecoder{}, false, nil
}

decoder, err := dm.ModuleDecoder()
return decoder, true, err
}
40 changes: 40 additions & 0 deletions indexer/base/entity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package indexerbase

// EntityUpdate represents an update operation on an entity in the schema.
type EntityUpdate struct {
// TableName is the name of the table that the entity belongs to in the schema.
TableName string

// Key returns the value of the primary key of the entity and must conform to these constraints with respect
// that the schema that is defined for the entity:
// - if key represents a single column, then the value must be valid for the first column in that
// column list. For instance, if there is one column in the key of type String, then the value must be of
// type string
// - if key represents multiple columns, then the value must be a slice of values where each value is valid
// for the corresponding column in the column list. For instance, if there are two columns in the key of
// type String, String, then the value must be a slice of two strings.
// If the key has no columns, meaning that this is a singleton entity, then this value is ignored and can be nil.
Key any

// Value returns the non-primary key columns of the entity and can either conform to the same constraints
// as EntityUpdate.Key or it may be and instance of ValueUpdates. ValueUpdates can be used as a performance
// optimization to avoid copying the values of the entity into the update and/or to omit unchanged columns.
// If this is a delete operation, then this value is ignored and can be nil.
Value any

// Delete is a flag that indicates whether this update is a delete operation. If true, then the Value field
// is ignored and can be nil.
Delete bool
}

// ValueUpdates is an interface that represents the value columns of an entity update. Columns that
// were not updated may be excluded from the update. Consumers should be aware that implementations
// may not filter out columns that were unchanged. However, if a column is omitted from the update
// it should be considered unchanged.
type ValueUpdates interface {

// Iterate iterates over the columns and values in the entity update. The function should return
// true to continue iteration or false to stop iteration. Each column value should conform
// to the requirements of that column's type in the schema.
Iterate(func(col string, value any) bool)
}
6 changes: 6 additions & 0 deletions indexer/base/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module cosmossdk.io/indexer/base

// NOTE: this go.mod should have zero dependencies and remain on an older version of Go
// to be compatible with legacy codebases.

go 1.19
Empty file added indexer/base/go.sum
Empty file.
Loading
Loading