Skip to content

Commit

Permalink
fix: state listener observe writes at wrong time (backport #13516) (#…
Browse files Browse the repository at this point in the history
…14139)

* fix: state listener observe writes at wrong time (#13516)

* fix: state listener observe writes at wrong time

Closes: #13457

Currently state listener is notified when the cache store write, which happens in commit event only, which breaks the current design.
The solution (as discussed in the issue) is to listen state writes on rootmulti store only.

It also changes the file streamer to output single data file for the writes in the whole block, since we can't distinguish writes from different stage of abci events.

It adds new config items for file streamer:
- streamers.file.output-metadata
- streamers.file.stop-node-on-error
- streamers.file.fsync

* synchronous abci call, and format doc

* fix comment

* update file streamer readme and fix typos

* typo

* fix: state listener observe writes at wrong time

Closes: #13457

Currently state listener is notified when the cache store write, which happens in commit event only, which breaks the current design.
The solution (as discussed in the issue) is to listen state writes on rootmulti store only.

It also changes the file streamer to output single data file for the writes in the whole block, since we can't distinguish writes from different stage of abci events.

It adds new config items for file streamer:
- streamers.file.output-metadata
- streamers.file.stop-node-on-error
- streamers.file.fsync

synchronous abci call, and format doc

fix comment

update file streamer readme and fix typos

typo

* improve UX of file streamer, make it immediately usable after enabled

- set default value to write_dir.
- make write_dir based on home directory by default.
- auto-create the directory if not exists.

* get homePage from opts

Co-authored-by: Marko <[email protected]>
(cherry picked from commit 1f91ee2)

# Conflicts:
#	CHANGELOG.md
#	api/cosmos/base/store/v1beta1/listening.pulsar.go
#	baseapp/abci.go
#	baseapp/streaming.go
#	docs/architecture/adr-038-state-listening.md
#	server/config/config.go
#	server/config/toml.go
#	simapp/app.go
#	simapp/app_v2.go
#	store/cachemulti/store.go
#	store/iavl/store.go
#	store/mem/store.go
#	store/rootmulti/store.go
#	store/streaming/constructor.go
#	store/streaming/constructor_test.go
#	store/streaming/file/README.md
#	store/streaming/file/service.go
#	store/streaming/file/service_test.go
#	store/types/listening.pb.go
#	store/types/store.go

* `make proto-gen`, update changelog, delete uncessary files

* fix conflicts

* fix GetConfig

Co-authored-by: yihuang <[email protected]>
Co-authored-by: Julien Robert <[email protected]>
  • Loading branch information
3 people authored Dec 6, 2022
1 parent 9baf33b commit 4a62609
Show file tree
Hide file tree
Showing 36 changed files with 1,723 additions and 1,037 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (store) [#11646](https://github.com/cosmos/cosmos-sdk/pull/11646) Add store name in tracekv-emitted store traces
* (deps) Bump Tendermint version to [v0.34.24](https://github.com/tendermint/tendermint/releases/tag/v0.34.24).

### API Breaking Changes

* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Update State Streaming APIs:
* Add method `ListenCommit` to `ABCIListener`
* Move `ListeningEnabled` and `AddListener` methods to `CommitMultiStore`
* Remove `CacheWrapWithListeners` from `CacheWrap` and `CacheWrapper` interfaces
* Remove listening APIs from the caching layer (it should only listen to the `rootmulti.Store`)
* Add three new options to file streaming service constructor.
* Modify `ABCIListener` such that any error from any method will always halt the app via `panic`

### Bug Fixes

* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Fix state listener that was observing writes at wrong time.

## v0.45.11 - 2022-11-09

### Improvements
Expand Down
36 changes: 25 additions & 11 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg

// call the hooks with the BeginBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
goCtx := sdk.WrapSDKContext(app.deliverState.ctx)
if err := streamingListener.ListenBeginBlock(goCtx, req, res); err != nil {
panic(fmt.Errorf("BeginBlock listening hook failed, height: %d, err: %w", req.Header.Height, err))
}
}

Expand All @@ -209,8 +210,9 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc

// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
goCtx := sdk.WrapSDKContext(app.deliverState.ctx)
if err := streamingListener.ListenEndBlock(goCtx, req, res); err != nil {
panic(fmt.Errorf("EndBlock listening hook failed, height: %d, err: %w", req.Height, err))
}
}

Expand Down Expand Up @@ -263,8 +265,9 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv

defer func() {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
goCtx := sdk.WrapSDKContext(app.deliverState.ctx)
if err := streamingListener.ListenDeliverTx(goCtx, req, res); err != nil {
panic(fmt.Errorf("DeliverTx listening hook failed: %w", err))
}
}
}()
Expand Down Expand Up @@ -301,7 +304,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv
// defined in config, Commit will execute a deferred function call to check
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (res abci.ResponseCommit) {
func (app *BaseApp) Commit() abci.ResponseCommit {
defer telemetry.MeasureSince(time.Now(), "abci", "commit")

header := app.deliverState.ctx.BlockHeader()
Expand All @@ -312,6 +315,20 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
// MultiStore (app.cms) so when Commit() is called is persists those values.
app.deliverState.ms.Write()
commitID := app.cms.Commit()

res := abci.ResponseCommit{
Data: commitID.Hash,
RetainHeight: retainHeight,
}

// call the hooks with the Commit message
for _, streamingListener := range app.abciListeners {
goCtx := sdk.WrapSDKContext(app.deliverState.ctx)
if err := streamingListener.ListenCommit(goCtx, res); err != nil {
panic(fmt.Errorf("Commit listening hook failed, height: %d, err: %w", header.Height, err))
}
}

app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID))

// Reset the Check state to the latest committed.
Expand Down Expand Up @@ -345,10 +362,7 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
go app.snapshot(header.Height)
}

return abci.ResponseCommit{
Data: commitID.Hash,
RetainHeight: retainHeight,
}
return res
}

// halt attempts to gracefully shutdown the node via SIGINT and SIGTERM falling
Expand Down
14 changes: 9 additions & 5 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package baseapp

import (
"context"
"io"
"sync"

abci "github.com/tendermint/tendermint/abci/types"

store "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/types"
)

// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp.
// the error results are propagated to consensus state machine,
// if you don't want to affect consensus, handle the errors internally and always return `nil` in these APIs.
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// ListenCommit updates the steaming service with the latest Commit event
ListenCommit(ctx context.Context, res abci.ResponseCommit) error
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
Expand Down
Loading

0 comments on commit 4a62609

Please sign in to comment.