Skip to content

Commit

Permalink
feat(server/v2): init the indexer in server/v2 (partial backport #22218
Browse files Browse the repository at this point in the history
…) (#22324)

Co-authored-by: cool-developer <[email protected]>
Co-authored-by: Julien Robert <[email protected]>
  • Loading branch information
3 people authored Oct 21, 2024
1 parent 39a726b commit e87562a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
5 changes: 0 additions & 5 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// SetListener sets the listener for the consensus module.
func (c *Consensus[T]) SetListener(l *appdata.Listener) {
c.listener = l
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
Expand Down
8 changes: 7 additions & 1 deletion server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cometbft
import (
cmtcfg "github.com/cometbft/cometbft/config"

"cosmossdk.io/schema/indexer"
"cosmossdk.io/server/v2/cometbft/mempool"
)

Expand All @@ -23,6 +24,10 @@ func DefaultAppTomlConfig() *AppTomlConfig {
Trace: false,
Standalone: false,
Mempool: mempool.DefaultConfig(),
Indexer: indexer.IndexingConfig{
Target: make(map[string]indexer.Config),
ChannelBufferSize: 1024,
},
}
}

Expand All @@ -37,7 +42,8 @@ type AppTomlConfig struct {
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`

// Sub configs
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
}

// CfgOption is a function that allows to overwrite the default server configuration.
Expand Down
15 changes: 15 additions & 0 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/indexer"
serverv2 "cosmossdk.io/server/v2"
cometlog "cosmossdk.io/server/v2/cometbft/log"
"cosmossdk.io/server/v2/cometbft/mempool"
Expand Down Expand Up @@ -131,6 +132,20 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg
return err
}
consensus.snapshotManager = snapshots.NewManager(snapshotStore, s.serverOptions.SnapshotOptions(cfg), sc, ss, nil, s.logger)

// initialize the indexer
if indexerCfg := s.config.AppTomlConfig.Indexer; len(indexerCfg.Target) > 0 {
listener, err := indexer.StartIndexing(indexer.IndexingOptions{
Config: indexerCfg,
Resolver: appI.SchemaDecoderResolver(),
Logger: s.logger.With(log.ModuleKey, "indexer"),
})
if err != nil {
return fmt.Errorf("failed to start indexing: %w", err)
}
consensus.listener = &listener.Listener
}

s.Consensus = consensus

return nil
Expand Down
10 changes: 9 additions & 1 deletion tools/confix/data/v2-app.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ standalone = false
# max-txs defines the maximum number of transactions that can be in the mempool. A value of 0 indicates an unbounded mempool, a negative value disables the app-side mempool.
max-txs = -1

# indexer defines the configuration for the SDK built-in indexer implementation.
[comet.indexer]
# Buffer size of the channels used for buffering data sent to indexer go routines.
channel_buffer_size = 1024

# Target is a map of named indexer targets to their configuration.
[comet.indexer.target]

[grpc]
# Enable defines if the gRPC server should be enabled.
enable = true
Expand Down Expand Up @@ -77,7 +85,7 @@ skip-fast-storage-upgrade = true
# Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus.
enable = true
# Address defines the metrics server address to bind to.
address = 'localhost:1338'
address = 'localhost:1318'
# Prefixed with keys to separate services.
service-name = ''
# Enable prefixing gauge values with hostname.
Expand Down

0 comments on commit e87562a

Please sign in to comment.