Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ADR-038: State change indexing with Kafka #2

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ vagrant
# IDE
.idea
*.iml
*.ipr
*.iws
.dir-locals.el
.vscode

Expand Down
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ test-rosetta:
docker-compose -f contrib/rosetta/docker-compose.yaml up --abort-on-container-exit --exit-code-from test_rosetta --build
.PHONY: test-rosetta

test-plugin-kafka:
@echo "Starting Kafka (logging disabled)..." ; \
docker-compose -f plugin/plugins/kafka/docker-compose.yml up -d zookeeper broker ; \
echo "Running unit tests for kafka plugin..." ; \
go test -mod=readonly $(ARGS) ./plugin/plugins/kafka/... ; \
docker-compose -f plugin/plugins/kafka/docker-compose.yml down ; \

benchmark:
@go test -mod=readonly -bench=. $(PACKAGES_NOSIMULATION)
.PHONY: benchmark
Expand Down
23 changes: 20 additions & 3 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the hooks with the BeginBlock messages
for _, streamingListener := range app.streamingListeners {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
Expand All @@ -224,7 +224,7 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
}

// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.streamingListeners {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx

var res abci.ResponseDeliverTx
defer func() {
for _, streamingListener := range app.streamingListeners {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
Expand Down Expand Up @@ -340,6 +340,23 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
halt = true
}

// each listener has an internal wait threshold after which it sends `false` to the ListenSuccess() channel
// but the BaseApp also imposes a global wait limit
maxWait := time.NewTicker(app.globalWaitLimit * time.Second)
egaxhaj marked this conversation as resolved.
Show resolved Hide resolved
defer maxWait.Stop()
for _, lis := range app.abciListeners {
select {
case success := <- lis.ListenSuccess():
if success == false {
halt = true
break
}
case <- maxWait.C:
halt = true
break
}
}

if halt {
// Halt the binary and allow Tendermint to receive the ResponseCommit
// response with the commit ID hash. This will allow the node to successfully
Expand Down
13 changes: 12 additions & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"reflect"
"time"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
Expand All @@ -27,6 +28,12 @@ const (
runTxModeDeliver // Deliver a transaction
)

// TOML configuration parameter keys
const (
// GlobalWaitLimitParam is the global wait limit threshold for delivery of messages to external services.
GlobalWaitLimitParam = "globalWaitLimit"
)

var (
_ abci.Application = (*BaseApp)(nil)
)
Expand Down Expand Up @@ -129,7 +136,11 @@ type BaseApp struct { // nolint: maligned

// StreamingListener for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
streamingListeners []streaming.Listener
abciListeners []streaming.ABCIListener

// globalWaitLimit is used to configure a global wait limit for receiving positive
// acknowledgement of message receipt from the integrated StreamingServices.
globalWaitLimit time.Duration
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
10 changes: 8 additions & 2 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package baseapp
import (
"fmt"
"io"
"time"

dbm "github.com/tendermint/tm-db"

Expand Down Expand Up @@ -237,7 +238,12 @@ func (app *BaseApp) SetStreamingService(s streaming.Service) {
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
}
// register the streamingListeners within the BaseApp
// register the abciListeners within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks
app.streamingListeners = append(app.streamingListeners, s)
app.abciListeners = append(app.abciListeners, s)
}

// SetGlobalWaitLimit is used to set the global wait limit threshold for processed messages of external streaming services.
func (app *BaseApp) SetGlobalWaitLimit(t time.Duration) {
app.globalWaitLimit = t
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/coinbase/rosetta-sdk-go v0.6.10
github.com/confio/ics23/go v0.6.6
github.com/confluentinc/confluent-kafka-go v1.7.0
github.com/cosmos/cosmos-proto v0.0.0-20210914142853-23ed61ac79ce
github.com/cosmos/go-bip39 v1.0.0
github.com/cosmos/iavl v0.17.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ github.com/coinbase/rosetta-sdk-go v0.6.10/go.mod h1:J/JFMsfcePrjJZkwQFLh+hJErkA
github.com/confio/ics23/go v0.6.3/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg=
github.com/confio/ics23/go v0.6.6 h1:pkOy18YxxJ/r0XFDCnrl4Bjv6h4LkBSpLS6F38mrKL8=
github.com/confio/ics23/go v0.6.6/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg=
github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down
23 changes: 23 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,29 @@ of names for the plugins we want to disable (useful for disabling preloaded plug
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
writeDir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"

[streaming.kafka] # the specific parameters for the Kafka streaming service plugin
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
# The topic where data will be stored
topic = "block"
# Flush and wait for outstanding messages and requests to complete delivery. (milliseconds)
flushTimeoutMs = 1500

# Producer configuration properties.
# The plugin uses confluent-kafka-go which is a lightweight wrapper around librdkafka.
# For a full list of producer configuration properties
# see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
[streaming.kafka.producer]
bootstrap.servers = "localhost:9092"
client.id = "my-app-id"
acks = "all"
# When set to true, the producer will ensure that messages
# are successfully produced exactly once and in the original produce order.
# The following configuration properties are adjusted automatically (if not modified by the user)
# when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5),
# retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo.
# Producer instantation will fail if user-supplied configuration is incompatible.
enable.idempotence = true
```

As mentioned above, some plugins can be preloaded. This means they do not need to be loaded from the specified `plugins.dir` and instead
Expand Down
8 changes: 4 additions & 4 deletions plugin/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func NewPluginLoader(opts serverTypes.AppOptions, logger logging.Logger) (*Plugi
return nil, err
}
}
loader.disabled = cast.ToStringSlice(opts.Get(plugin.PLUGIN_DISABLED_TOML_KEY))
pluginDir := cast.ToString(opts.Get(plugin.PLUGIN_DIR_TOML_KEY))
loader.disabled = cast.ToStringSlice(opts.Get(fmt.Sprintf("%s.%s",plugin.PLUGIN_TOML_KEY, plugin.PLUGIN_DISABLED_TOML_KEY)))
pluginDir := cast.ToString(opts.Get(fmt.Sprintf("%s.%s", plugin.PLUGIN_TOML_KEY, plugin.PLUGIN_DIR_TOML_KEY)))
if pluginDir == "" {
pluginDir = filepath.Join(os.Getenv("GOPATH"), plugin.DEFAULT_PLUGIN_DIRECTORY)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (loader *PluginLoader) Inject(bApp *baseapp.BaseApp, marshaller codec.Binar
}

for _, pl := range loader.plugins {
if pl, ok := pl.(plugin.StreamingService); ok {
if pl, ok := pl.(plugin.StateStreamingPlugin); ok {
if err := pl.Register(bApp, marshaller, keys); err != nil {
loader.state = loaderFailed
return err
Expand All @@ -252,7 +252,7 @@ func (loader *PluginLoader) Start(wg *sync.WaitGroup) error {
return err
}
for _, pl := range loader.plugins {
if pl, ok := pl.(plugin.StreamingService); ok {
if pl, ok := pl.(plugin.StateStreamingPlugin); ok {
pl.Start(wg)
loader.started = append(loader.started, pl)
}
Expand Down
5 changes: 2 additions & 3 deletions plugin/loader/preload.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package loader

import (
file "github.com/cosmos/cosmos-sdk/plugin/plugins/file"
plugintrace "github.com/cosmos/cosmos-sdk/plugin/plugins/trace"
)

// DO NOT EDIT THIS FILE
// This file is being generated as part of plugin build process
// To change it, modify the plugin/loader/preload.sh

func init() {
Preload(file.Plugins...)
Preload(plugintrace.Plugins...)
}
4 changes: 3 additions & 1 deletion plugin/loader/preload_list
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
#
# name go-path number of the sub-plugin or *

file github.com/cosmos/cosmos-sdk/plugin/plugins/file *
trace github.com/cosmos/cosmos-sdk/plugin/plugins/trace *
#file github.com/cosmos/cosmos-sdk/plugin/plugins/file *
#kafka github.com/cosmos/cosmos-sdk/plugin/plugins/kafka *
6 changes: 3 additions & 3 deletions plugin/plugins/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type streamingServicePlugin struct {
opts serverTypes.AppOptions
}

var _ plugin.StreamingService = (*streamingServicePlugin)(nil)
var _ plugin.StateStreamingPlugin = (*streamingServicePlugin)(nil)

// Name satisfies the plugin.Plugin interface
func (ssp *streamingServicePlugin) Name() string {
Expand All @@ -63,7 +63,7 @@ func (ssp *streamingServicePlugin) Init(env serverTypes.AppOptions) error {
return nil
}

// Register satisfies the plugin.StreamingService interface
// Register satisfies the plugin.StateStreamingPlugin interface
func (ssp *streamingServicePlugin) Register(bApp *baseapp.BaseApp, marshaller codec.BinaryCodec, keys map[string]*sdk.KVStoreKey) error {
// load all the params required for this plugin from the provided AppOptions
filePrefix := cast.ToString(ssp.opts.Get(fmt.Sprintf("%s.%s.%s", plugin.PLUGIN_TOML_KEY, PLUGIN_NAME, PREFIX_PARAM)))
Expand Down Expand Up @@ -94,7 +94,7 @@ func (ssp *streamingServicePlugin) Register(bApp *baseapp.BaseApp, marshaller co
return nil
}

// Start satisfies the plugin.StreamingService interface
// Start satisfies the plugin.StateStreamingPlugin interface
func (ssp *streamingServicePlugin) Start(wg *sync.WaitGroup) {
ssp.fss.Stream(wg)
}
Expand Down
4 changes: 4 additions & 0 deletions plugin/plugins/file/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ func (fss *FileStreamingService) Close() error {
return nil
}

func (fss *FileStreamingService) ListenSuccess() <-chan bool {
panic("implement me")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making the compiler happy.

}

// isDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
func isDirWriteable(dir string) error {
Expand Down
Loading