Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ADR 038 plugin system #11691

Closed
Closed
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
885b45a
patch adr-038 doc
i-norden Nov 29, 2021
00756c7
plugin interfaces
i-norden Nov 29, 2021
daed581
plugin loader/preloader
i-norden Nov 29, 2021
fdb450d
plugin documentation
i-norden Nov 29, 2021
5d33e23
file writing plugin
i-norden Nov 29, 2021
a6678a1
baseapp pkg updates
i-norden Nov 29, 2021
bdbeaab
simapp integration
i-norden Nov 29, 2021
f2306ff
remove old store/streaming pkg
i-norden Nov 29, 2021
72f213b
fixes
i-norden Nov 29, 2021
3879ae0
did not port from cherry pick
Feb 3, 2022
283f2c4
add trace plugin
Feb 3, 2022
6d1944c
update readme
Feb 3, 2022
98240f5
updates trace plugin
Feb 4, 2022
2421b33
fix trace test
Feb 4, 2022
07aaf16
remove unused params
Feb 4, 2022
70b3a08
add kafka plugin
Feb 4, 2022
d79f52a
setup non-deterministic testing
Feb 4, 2022
be4bb2a
opt-in approach to enabling plugins
Feb 4, 2022
f21ff7e
synchronize work between kafka and app.Commit()
Feb 4, 2022
222394b
remove dependency on ack channel and use listener error response to act
Feb 5, 2022
7fd7806
fromatting
Feb 11, 2022
2680478
concurrent listener calls
Feb 18, 2022
d86ce53
async fire-and-forget when halt_app_on_delivery_error = false
Mar 2, 2022
7f6173a
updated comments for HaltAppOnDeliveryError
Mar 3, 2022
cdfe1b0
improve non-determinism tests for state listening
Mar 4, 2022
8ed57c4
continue with testing when docker-compose returns error
Mar 10, 2022
a20a6f9
add fallback timer to kill indefinite running listener goroutines
Mar 15, 2022
0ad44b7
update comment
Mar 16, 2022
9570199
fix typo
Mar 16, 2022
3c19fdb
disable delivery report when in fire-and-forget mode
Mar 23, 2022
6c0530f
code improvement
Mar 23, 2022
63ea30b
fix config param
Mar 29, 2022
7f4ed75
remove fallback timer, long upgrades may trigger it
Mar 29, 2022
e08fbf7
use golang enum type for message key
Mar 30, 2022
603dca4
default to fire-and-forget for sim testing
Mar 30, 2022
08d6de9
serialize to protobuf binary
Apr 5, 2022
1f2ed49
remove global wait timout
Apr 11, 2022
2bfc803
update plugin docs
Apr 12, 2022
2625529
update ADR to reflect latest proposal
Apr 12, 2022
af1f9fc
tidy up
Apr 19, 2022
ad670f2
update changelog
Apr 19, 2022
11bc48b
unlock stateCacheLock to prevent deadlock on error
Jun 9, 2022
331aa94
fix format input count
Jul 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ vagrant
# IDE
.idea
*.iml
*.ipr
*.iws
.dir-locals.el
.vscode

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [\#10962](https://github.com/cosmos/cosmos-sdk/pull/10962) ADR-040: Add state migration from iavl (v1Store) to smt (v2Store)
* (types) [\#10948](https://github.com/cosmos/cosmos-sdk/issues/10948) Add `app-db-backend` to the `app.toml` config to replace the compile-time `types.DBbackend` variable.
* (authz)[\#11060](https://github.com/cosmos/cosmos-sdk/pull/11060) Support grant with no expire time.
* [#11691](https://github.com/cosmos/cosmos-sdk/pull/11691) Plugin architecture for ADR-038

### API Breaking Changes

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ endif

.PHONY: run-tests test test-all $(TEST_TARGETS)

# Sim tests with state listening plugins enabled
include sim-state-listening.mk

test-sim-nondeterminism:
@echo "Running non-determinism test..."
@go test -mod=readonly $(SIMAPP) -run TestAppStateDeterminism -Enabled=true \
Expand Down
74 changes: 66 additions & 8 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"sort"
"strings"
"sync"
"syscall"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -190,11 +191,30 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the hooks with the BeginBlock messages
wg := new(sync.WaitGroup)
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines
if streamingListener.HaltAppOnDeliveryError() {
// increment the wait group counter
wg.Add(1)
go func() {
// decrement the counter when the go routine completes
defer wg.Done()
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
app.halt()
}
}()
} else {
go func() {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
}()
Comment on lines +209 to +213

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this goroutine is spawned and executed asynchronously, it's possible that it's scheduled when app.deliverState has one value, and executes when app.deliverState has a different value, e.g. representing the next block. This would mean that ListenBeginBlock will be called with a ctx that doesn't correspond to its sibling req and res parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been debugging an AppHash mismatch error when I test the plugins on our provenance blockchain. This is most likely the cause.

Our initial thinking here was that we wanted to make calls to multiple listeners asynchronous. I'll test this out with synchronized listeners and report my findings.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary consequence of making listeners synchronous is that slow listeners can impact node liveness. If HaltAppOnDeliveryError is true, then users are explicitly opting in to this risk, so it's (arguably) OK there. But if that setting is false, then I think it would be a mistake to make delivery synchronous.

The solution here is to produce the complete event when this method is called, and emit that event value to listeners. The event value would need to contain only plain-old-data, i.e. no pointers or references to anything else in the node or app or etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been debugging an AppHash mismatch error when I test the plugins on our provenance blockchain. This is most likely the cause.

Our initial thinking here was that we wanted to make calls to multiple listeners asynchronous. I'll test this out with synchronized listeners and report my findings.

I tested this and it is not the cause of the AppHash error I see on a two node localnet. I disabled all the listeners in abci.go (thinking this was the cause) and I was able to narrow it down to either the Inject() or Start() as the cause. When you turn on the plugin system and enable any plugin it will result in an AppHash error. I also used the iaveiwer tool but didn't see any differences in the iAVL tree. This leads me to think the state change is happening at runtime before it's persisted.

Something like.

panic: Failed to process committed block (11:AF09D0FF0AACA156034874C0A14F3C2A4EB4FF384D22201062AF5E4ADD2FAE4A): wrong Block.Header.AppHash.  Expected 6AC03C53EC307EFA46AA61B006833C9C6A6A37C1B54D8B184D2B1B468D8026AB, got 7003EC3FBBDEADC9D62E27C8DF75B27BDF7EF4B575E5653E8D7921571CB9C64C

@i-norden ^^^ perhaps you can take a look at the plugin system and maybe find what is changing the app state.

The primary consequence of making listeners synchronous is that slow listeners can impact node liveness. If HaltAppOnDeliveryError is true, then users are explicitly opting in to this risk, so it's (arguably) OK there. But if that setting is false, then I think it would be a mistake to make delivery synchronous.

The solution here is to produce the complete event when this method is called, and emit that event value to listeners. The event value would need to contain only plain-old-data, i.e. no pointers or references to anything else in the node or app or etc.

I agree ^^^. I was testing to see if asynchronous vs synchronous listeners was the cause for my AppHash error above. We'll also need to pass in any addition info required to keep track of what block is being emitted etc.

so the new listener interface could look like this?

type ABCIListener interface {
	ListenBeginBlock(logger log.Logger, blockHeight int64, req []byte, res []byte) error
	ListenEndBlock(logger log.Logger, blockHeight int64, req []byte, res []byte) error
	ListenDeliverTx(logger log.Logger, blockHeight int64, req []byte, res []byte) error
	HaltAppOnDeliveryError() bool
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An ABCIListener interface should

  1. Take a context.Context as a first parameter, and respect context cancellation
  2. Already know about any log.Logger it needs, and therefore not take a logger as a parameter
  3. Not need to signal HaltAppOnDeliveryError thru a method, but instead either
    a. Halt the app directly via os.Exit or panic or whatever, or
    b. Signal that information via return error from each method, detected by the caller via errors.Is/As

}
}
// wait for all the listener calls to finish
wg.Wait()

return res
}
Expand All @@ -215,12 +235,31 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
res.ConsensusParamUpdates = cp
}

// call the streaming service hooks with the EndBlock messages
// call the hooks with the BeginBlock messages
wg := new(sync.WaitGroup)
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines
if streamingListener.HaltAppOnDeliveryError() {
// increment the wait group counter
wg.Add(1)
go func() {
// decrement the counter when the go routine completes
defer wg.Done()
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
app.halt()
}
}()
} else {
go func() {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
}()
}
}
// wait for all the listener calls to finish
wg.Wait()

return res
}
Expand Down Expand Up @@ -269,11 +308,31 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx

var abciRes abci.ResponseDeliverTx
defer func() {
// call the hooks with the BeginBlock messages
wg := new(sync.WaitGroup)
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, abciRes); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines
if streamingListener.HaltAppOnDeliveryError() {
// increment the wait group counter
wg.Add(1)
go func() {
// decrement the counter when the go routine completes
defer wg.Done()
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, abciRes); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
app.halt()
}
}()
} else {
go func() {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, abciRes); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}()
}
}
// wait for all the listener calls to finish
wg.Wait()
}()

ctx := app.getContextForTx(runTxModeDeliver, req.Tx)
Expand All @@ -289,7 +348,6 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
}

return abciRes

}

// Commit implements the ABCI interface. It will commit all state that exists in
Expand Down
1 change: 0 additions & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
Expand Down
3 changes: 1 addition & 2 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package baseapp

import (
"fmt"
"io"

dbm "github.com/tendermint/tm-db"
"io"

"github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/snapshots"
Expand Down
7 changes: 7 additions & 0 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type ABCIListener interface {
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// HaltAppOnDeliveryError returns true if the application has been configured to halt when
// ListenBeginBlock, ListenEndBlock, ListenDeliverTx fail to process messages and false when
// the application has been configured to send messages to ListenBeginBlock, ListenEndBlock, ListenDeliverTx
// in fire-and-forget fashion.
//
// This behavior is controlled by a corresponding app config setting.
HaltAppOnDeliveryError() bool

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, implementations of ABCIListener which are configured to HaltAppOnDeliveryError should terminate the process if any of the Listen-class methods return a non-nil error. Is that correct? If so, would it not be simpler to have those method implementations halt the app directly, rather than delegating that responsibility to the caller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

app.halt() is private so it would not be possible to halt the app from the listeners. Also, do we want to expose more of the internal API to the listeners?

func (app *BaseApp) halt() {

@i-norden ^^^

Copy link

@peterbourgon peterbourgon Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that app.halt just kills the process un-cleanly. Presuming that is the intended behavior, you don't need a special method to do it :) as you can call os.Exit(0) — or, rather, os.Exit(1), as 0 indicates success, which an app halt most certainly isn't — from anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) very true but it's an unclean way of exiting. Also, it will bypass error reporting in the listener methods. ListenBeginBlock, ListenEndBlock, ListenDeliverTx report errors up the stack so their method signature will need to change.

We should have the the broader team chime in on this a bit and provide feedback as to how we want plugins to behave in regards to this.

app.halt does eventually os.Exit(0) so perhaps we just fix its behavior?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@egaxhaj

very true but it's an unclean way of exiting. Also, it will bypass error reporting in the listener methods . . .

Sure! Halting an app is (AFAIK) an unclean exit, by definition. I don't think there's any way to reliably issue a "halt" from within the SDK that (a) actually terminates the process, and (b) unwinds the call stack, and captures/logs errors, in a clean and predictable way. Happy to be corrected.

app.halt does eventually os.Exit(0) so perhaps we just fix its behavior?

Is the current behavior not correct? What would be more correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my 2 cents is this is a secondary system that should have no effect on the app and consensus. If the indexer is halting the app to not lose data then a different approach needs to be thought of in which it doesn't affect the app.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marbar3778 - @i-norden and I have had discussions about how a secondary system would work that does not effect the app. However, we agreed to move forward with the current design to allow for time to be able to work through a secondary system design.

}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
Expand Down
Loading