From 4549180a885634fd1f98c4e43ed62e9a2dc75f63 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 6 Dec 2022 07:00:17 +0000 Subject: [PATCH] fix: state listener observe writes at wrong time (backport #13516) (#14139) * fix: state listener observe writes at wrong time (#13516) * fix: state listener observe writes at wrong time Closes: #13457 Currently state listener is notified when the cache store write, which happens in commit event only, which breaks the current design. The solution (as discussed in the issue) is to listen state writes on rootmulti store only. It also changes the file streamer to output single data file for the writes in the whole block, since we can't distinguish writes from different stage of abci events. It adds new config items for file streamer: - streamers.file.output-metadata - streamers.file.stop-node-on-error - streamers.file.fsync * synchronous abci call, and format doc * fix comment * update file streamer readme and fix typos * typo * fix: state listener observe writes at wrong time Closes: #13457 Currently state listener is notified when the cache store write, which happens in commit event only, which breaks the current design. The solution (as discussed in the issue) is to listen state writes on rootmulti store only. It also changes the file streamer to output single data file for the writes in the whole block, since we can't distinguish writes from different stage of abci events. It adds new config items for file streamer: - streamers.file.output-metadata - streamers.file.stop-node-on-error - streamers.file.fsync synchronous abci call, and format doc fix comment update file streamer readme and fix typos typo * improve UX of file streamer, make it immediately usable after enabled - set default value to write_dir. - make write_dir based on home directory by default. - auto-create the directory if not exists. * get homePage from opts Co-authored-by: Marko (cherry picked from commit 1f91ee2ee941fd9a1dd4bc3acecd753e3cb7e237) # Conflicts: # CHANGELOG.md # api/cosmos/base/store/v1beta1/listening.pulsar.go # baseapp/abci.go # baseapp/streaming.go # docs/architecture/adr-038-state-listening.md # server/config/config.go # server/config/toml.go # simapp/app.go # simapp/app_v2.go # store/cachemulti/store.go # store/iavl/store.go # store/mem/store.go # store/rootmulti/store.go # store/streaming/constructor.go # store/streaming/constructor_test.go # store/streaming/file/README.md # store/streaming/file/service.go # store/streaming/file/service_test.go # store/types/listening.pb.go # store/types/store.go * `make proto-gen`, update changelog, delete uncessary files * fix conflicts * fix GetConfig Co-authored-by: yihuang Co-authored-by: Julien Robert --- CHANGELOG.md | 14 + baseapp/abci.go | 51 +- baseapp/streaming.go | 14 +- docs/architecture/adr-038-state-listening.md | 769 ++++++++----------- go.mod | 2 +- server/config/config.go | 143 ++-- server/config/toml.go | 20 + simapp/app.go | 6 +- store/cachekv/store.go | 1 - store/cachemulti/store.go | 19 +- store/dbadapter/store.go | 11 +- store/iavl/store.go | 3 +- store/iavl/store_test.go | 45 -- store/listenkv/store.go | 7 + store/mem/store.go | 13 +- store/prefix/store.go | 8 +- store/rootmulti/store.go | 31 +- store/rootmulti/store_test.go | 3 - store/streaming/constructor.go | 68 +- store/streaming/constructor_test.go | 74 +- store/streaming/file/README.md | 101 ++- store/streaming/file/service.go | 315 +++----- store/streaming/file/service_test.go | 215 ++---- store/types/listening.go | 34 + store/types/listening.pb.go | 563 ++++++++++++-- store/types/store.go | 10 +- 26 files changed, 1407 insertions(+), 1133 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4042bef3fd27..9ea9b7c7597b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,20 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (store) [#11646](https://github.com/cosmos/cosmos-sdk/pull/11646) Add store name in tracekv-emitted store traces * (deps) Bump Tendermint version to [v0.34.24](https://github.com/tendermint/tendermint/releases/tag/v0.34.24). +### API Breaking Changes + +* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Update State Streaming APIs: + * Add method `ListenCommit` to `ABCIListener` + * Move `ListeningEnabled` and `AddListener` methods to `CommitMultiStore` + * Remove `CacheWrapWithListeners` from `CacheWrap` and `CacheWrapper` interfaces + * Remove listening APIs from the caching layer (it should only listen to the `rootmulti.Store`) + * Add three new options to file streaming service constructor. + * Modify `ABCIListener` such that any error from any method will always halt the app via `panic` + +### Bug Fixes + +* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Fix state listener that was observing writes at wrong time. + ## v0.45.11 - 2022-11-09 ### Improvements diff --git a/baseapp/abci.go b/baseapp/abci.go index c6265cf81d84..8cd999a95a45 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -228,8 +228,9 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg // call the hooks with the BeginBlock messages for _, streamingListener := range app.abciListeners { - if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil { - app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err) + goCtx := sdk.WrapSDKContext(app.deliverState.ctx) + if err := streamingListener.ListenBeginBlock(goCtx, req, res); err != nil { + panic(fmt.Errorf("BeginBlock listening hook failed, height: %d, err: %w", req.Header.Height, err)) } } @@ -257,8 +258,9 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc // call the streaming service hooks with the EndBlock messages for _, streamingListener := range app.abciListeners { - if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil { - app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err) + goCtx := sdk.WrapSDKContext(app.deliverState.ctx) + if err := streamingListener.ListenEndBlock(goCtx, req, res); err != nil { + panic(fmt.Errorf("EndBlock listening hook failed, height: %d, err: %w", req.Height, err)) } } @@ -311,8 +313,9 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv defer func() { for _, streamingListener := range app.abciListeners { - if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil { - app.logger.Error("DeliverTx listening hook failed", "err", err) + goCtx := sdk.WrapSDKContext(app.deliverState.ctx) + if err := streamingListener.ListenDeliverTx(goCtx, req, res); err != nil { + panic(fmt.Errorf("DeliverTx listening hook failed: %w", err)) } } }() @@ -351,20 +354,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv // defined in config, Commit will execute a deferred function call to check // against that height and gracefully halt if it matches the latest committed // height. -func (app *BaseApp) Commit() (res abci.ResponseCommit) { - res, snapshotHeight := app.CommitWithoutSnapshot() - - if snapshotHeight > 0 { - go app.Snapshot(snapshotHeight) - } - - return res -} - -// CommitWithoutSnapshot is like Commit but returns the snapshot information instead -// of starting the snapshot goroutine -// It can be used by apps to synchronize snapshots according to their requirements. -func (app *BaseApp) CommitWithoutSnapshot() (res abci.ResponseCommit, snapshotHeight int64) { +func (app *BaseApp) Commit() abci.ResponseCommit { defer telemetry.MeasureSince(time.Now(), "abci", "commit") header := app.deliverState.ctx.BlockHeader() @@ -375,6 +365,20 @@ func (app *BaseApp) CommitWithoutSnapshot() (res abci.ResponseCommit, snapshotHe // MultiStore (app.cms) so when Commit() is called is persists those values. app.deliverState.ms.Write() commitID := app.cms.Commit() + + res := abci.ResponseCommit{ + Data: commitID.Hash, + RetainHeight: retainHeight, + } + + // call the hooks with the Commit message + for _, streamingListener := range app.abciListeners { + goCtx := sdk.WrapSDKContext(app.deliverState.ctx) + if err := streamingListener.ListenCommit(goCtx, res); err != nil { + panic(fmt.Errorf("Commit listening hook failed, height: %d, err: %w", header.Height, err)) + } + } + app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID)) // Reset the Check state to the latest committed. @@ -408,12 +412,7 @@ func (app *BaseApp) CommitWithoutSnapshot() (res abci.ResponseCommit, snapshotHe snapshotHeight = header.Height } - res = abci.ResponseCommit{ - Data: commitID.Hash, - RetainHeight: retainHeight, - } - - return res, snapshotHeight + return res } // halt attempts to gracefully shutdown the node via SIGINT and SIGTERM falling diff --git a/baseapp/streaming.go b/baseapp/streaming.go index 39e0f1ca6e9b..b8b382ae05b3 100644 --- a/baseapp/streaming.go +++ b/baseapp/streaming.go @@ -1,23 +1,27 @@ package baseapp import ( + "context" "io" "sync" abci "github.com/tendermint/tendermint/abci/types" store "github.com/cosmos/cosmos-sdk/store/types" - "github.com/cosmos/cosmos-sdk/types" ) -// ABCIListener interface used to hook into the ABCI message processing of the BaseApp +// ABCIListener interface used to hook into the ABCI message processing of the BaseApp. +// the error results are propagated to consensus state machine, +// if you don't want to affect consensus, handle the errors internally and always return `nil` in these APIs. type ABCIListener interface { // ListenBeginBlock updates the streaming service with the latest BeginBlock messages - ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error + ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error // ListenEndBlock updates the steaming service with the latest EndBlock messages - ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error + ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error // ListenDeliverTx updates the steaming service with the latest DeliverTx messages - ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error + ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error + // ListenCommit updates the steaming service with the latest Commit event + ListenCommit(ctx context.Context, res abci.ResponseCommit) error } // StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 3c360ede4751..bcd7945cd07b 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -3,12 +3,11 @@ ## Changelog * 11/23/2020: Initial draft -* 10/06/2022: Introduce plugin system based on hashicorp/go-plugin * 10/14/2022: - * Add `ListenCommit`, flatten the state writes in a block to a single batch. - * Remove listeners from cache stores, should only listen to `rootmulti.Store`. - * Remove `HaltAppOnDeliveryError()`, the errors are propagated by default, the implementations should return nil if don't want to propagate errors. -* 26/05/2023: Update with ABCI 2.0 + * Add `ListenCommit`, flatten the state writes in a block to a single batch. + * Remove listeners from cache stores, should only listen to `rootmulti.Store`. + * Remove `HaltAppOnDeliveryError()`, the errors are propogated by default, the implementations should return nil if don't want to propogate errors. + ## Status @@ -38,9 +37,9 @@ The `MemoryListener` will be used internally by the concrete `rootmulti` impleme // WriteListener interface for streaming data out from a listenkv.Store type WriteListener interface { // if value is nil then it was deleted - // storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores + // storeKey indicates the source KVStore, to facilitate using the same WriteListener across separate KVStores // delete bool indicates if it was a delete; true: delete, false: set - OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error + OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error } // NewMemoryListener creates a listener that accumulate the state writes in memory. @@ -58,13 +57,8 @@ func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, d }) } -// PopStateCache returns the current state caches and set to nil -func (fl *MemoryListener) PopStateCache() []StoreKVPair { - res := fl.stateCache - fl.stateCache = nil - return res -} -``` +We will create two concrete implementations of the `WriteListener` interface in `store/types/listening.go`, that writes out protobuf +encoded KV pairs to an underlying `io.Writer`, and simply accumulate them in memory. We will also define a protobuf type for the KV pairs. In addition to the key and value fields this message will include the StoreKey for the originating KVStore so that we can collect information from separate KVStores and determine the source of each KV pair. @@ -78,6 +72,76 @@ message StoreKVPair { } ``` +```go +// StoreKVPairWriteListener is used to configure listening to a KVStore by writing out length-prefixed +// protobuf encoded StoreKVPairs to an underlying io.Writer +type StoreKVPairWriteListener struct { + writer io.Writer + marshaller codec.BinaryCodec +} + +// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryCodec +func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairWriteListener { + return &StoreKVPairWriteListener{ + writer: w, + marshaller: m, + } +} + +// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs +func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error error { + kvPair := new(types.StoreKVPair) + kvPair.StoreKey = storeKey.Name() + kvPair.Delete = Delete + kvPair.Key = key + kvPair.Value = value + by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair) + if err != nil { + return err + } + if _, err := wl.writer.Write(by); err != nil { + return err + } + return nil +} +``` + +```golang +// MemoryListener listens to the state writes and accumulate the records in memory. +type MemoryListener struct { + key StoreKey + stateCache []StoreKVPair +} + +// NewMemoryListener creates a listener that accumulate the state writes in memory. +func NewMemoryListener(key StoreKey) *MemoryListener { + return &MemoryListener{key: key} +} + +// OnWrite implements WriteListener interface +func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error { + fl.stateCache = append(fl.stateCache, StoreKVPair{ + StoreKey: storeKey.Name(), + Delete: delete, + Key: key, + Value: value, + }) + return nil +} + +// PopStateCache returns the current state caches and set to nil +func (fl *MemoryListener) PopStateCache() []StoreKVPair { + res := fl.stateCache + fl.stateCache = nil + return res +} + +// StoreKey returns the storeKey it listens to +func (fl *MemoryListener) StoreKey() StoreKey { + return fl.key +} +``` + ### ListenKVStore We will create a new `Store` type `listenkv.Store` that the `rootmulti` store will use to wrap a `KVStore` to enable state listening. @@ -89,14 +153,14 @@ We will configure the `Store` with a `MemoryListener` which will collect state c // underlying listeners with the proper key and operation permissions type Store struct { parent types.KVStore - listener *types.MemoryListener + listeners []types.WriteListener parentStoreKey types.StoreKey } // NewStore returns a reference to a new traceKVStore given a parent // KVStore implementation and a buffered writer. -func NewStore(parent types.KVStore, psk types.StoreKey, listener *types.MemoryListener) *Store { - return &Store{parent: parent, listener: listener, parentStoreKey: psk} +func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteListener) *Store { + return &Store{parent: parent, listeners: listeners, parentStoreKey: psk} } // Set implements the KVStore interface. It traces a write operation and @@ -104,38 +168,47 @@ func NewStore(parent types.KVStore, psk types.StoreKey, listener *types.MemoryLi func (s *Store) Set(key []byte, value []byte) { types.AssertValidKey(key) s.parent.Set(key, value) - s.listener.OnWrite(s.parentStoreKey, key, value, false) + s.onWrite(false, key, value) } // Delete implements the KVStore interface. It traces a write operation and // delegates the Delete call to the parent KVStore. func (s *Store) Delete(key []byte) { s.parent.Delete(key) - s.listener.OnWrite(s.parentStoreKey, key, nil, true) + s.onWrite(true, key, nil) +} + +// onWrite writes a KVStore operation to all the WriteListeners +func (s *Store) onWrite(delete bool, key, value []byte) { + for _, l := range s.listeners { + if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil { + // log error + } + } } ``` ### MultiStore interface updates -We will update the `CommitMultiStore` interface to allow us to wrap a `Memorylistener` to a specific `KVStore`. -Note that the `MemoryListener` will be attached internally by the concrete `rootmulti` implementation. +We will update the `CommitMultiStore` interface to allow us to wrap a set of listeners around a specific `KVStore`. ```go type CommitMultiStore interface { ... - // AddListeners adds a listener for the KVStore belonging to the provided StoreKey - AddListeners(keys []StoreKey) + // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey + ListeningEnabled(key StoreKey) bool - // PopStateCache returns the accumulated state change messages from MemoryListener - PopStateCache() []StoreKVPair + // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey + // It appends the listeners to a current set, if one already exists + AddListeners(key StoreKey, listeners []WriteListener) } ``` - ### MultiStore implementation updates -We will adjust the `rootmulti` `GetKVStore` method to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on for that `Store`. +We will modify all of the `CommitMultiStore` implementations to satisfy these new interfaces, and adjust the `rootmulti` `GetKVStore` method +to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on for that `Store`. ```go func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { @@ -145,294 +218,68 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext) } if rs.ListeningEnabled(key) { - store = listenkv.NewStore(store, key, rs.listeners[key]) + store = listenkv.NewStore(key, store, rs.listeners[key]) } return store } ``` -We will implement `AddListeners` to manage KVStore listeners internally and implement `PopStateCache` -for a means of retrieving the current state. - -```go -// AddListeners adds state change listener for a specific KVStore -func (rs *Store) AddListeners(keys []types.StoreKey) { - listener := types.NewMemoryListener() - for i := range keys { - rs.listeners[keys[i]] = listener - } -} -``` - -```go -func (rs *Store) PopStateCache() []types.StoreKVPair { - var cache []types.StoreKVPair - for _, ls := range rs.listeners { - cache = append(cache, ls.PopStateCache()...) - } - sort.SliceStable(cache, func(i, j int) bool { - return cache[i].StoreKey < cache[j].StoreKey - }) - return cache -} -``` - -We will also adjust the `rootmulti` `CacheMultiStore` and `CacheMultiStoreWithVersion` methods to enable listening in -the cache layer. +We will also adjust the `rootmulti` `CacheMultiStore` method to wrap the stores with `listenkv.Store` to enable listening when the cache layer writes. ```go func (rs *Store) CacheMultiStore() types.CacheMultiStore { - stores := make(map[types.StoreKey]types.CacheWrapper) - for k, v := range rs.stores { - store := v.(types.KVStore) - // Wire the listenkv.Store to allow listeners to observe the writes from the cache store, - // set same listeners on cache store will observe duplicated writes. - if rs.ListeningEnabled(k) { - store = listenkv.NewStore(store, k, rs.listeners[k]) - } - stores[k] = store - } - return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.getTracingContext()) -} -``` - -```go -func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStore, error) { - // ... - - // Wire the listenkv.Store to allow listeners to observe the writes from the cache store, - // set same listeners on cache store will observe duplicated writes. - if rs.ListeningEnabled(key) { - cacheStore = listenkv.NewStore(cacheStore, key, rs.listeners[key]) - } - - cachedStores[key] = cacheStore - } - - return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.getTracingContext()), nil + stores := make(map[types.StoreKey]types.CacheWrapper) + for k, v := range rs.stores { + store := v.(types.KVStore) + // Wire the listenkv.Store to allow listeners to observe the writes from the cache store, + // set same listeners on cache store will observe duplicated writes. + if rs.ListeningEnabled(k) { + store = listenkv.NewStore(store, k, rs.listeners[k]) + } + stores[k] = store + } + return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.getTracingContext()) } ``` ### Exposing the data +#### Streaming service + We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers. -In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs into the BaseApp -and relays ABCI requests and responses so that the service can group the state changes with the ABCI requests that affected them and the ABCI responses they affected. +In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs +into the BaseApp and relays ABCI requests and responses so that the service can observe those block metadatas as well. + +The `WriteListener`s of `StreamingService` listens to the `rootmulti.Store`, which is only written into at commit event by the cache store of `deliverState`. ```go // ABCIListener interface used to hook into the ABCI message processing of the BaseApp type ABCIListener interface { - // ListenBeginBlock updates the streaming service with the latest BeginBlock messages - ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error - // ListenEndBlock updates the steaming service with the latest EndBlock messages - ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error - // ListenDeliverTx updates the steaming service with the latest DeliverTx messages - ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error + // ListenBeginBlock updates the streaming service with the latest BeginBlock messages + ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error + // ListenEndBlock updates the steaming service with the latest EndBlock messages + ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error + // ListenDeliverTx updates the steaming service with the latest DeliverTx messages + ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error + // ListenCommit updates the steaming service with the latest Commit message, + // All the state writes of current block should have notified before this message. + ListenCommit(ctx types.Context, res abci.ResponseCommit) error } // StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks type StreamingService interface { - // Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file - Stream(wg *sync.WaitGroup) error - // Listeners returns the streaming service's listeners for the BaseApp to register - Listeners() map[types.StoreKey][]store.WriteListener - // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp - ABCIListener - // Closer interface - io.Closer -} -``` - -#### BaseApp Registration - -We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s: - -Writing to a file is the simplest approach for streaming the data out to consumers. -This approach also provides the advantages of being persistent and durable, and the files can be read directly, -or an auxiliary streaming services can read from the files and serve the data over a remote interface. - -##### Encoding - -For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number. -At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written. -At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written. -In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as -a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service -is configured to listen to. - -For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M -is the tx number in the block (i.e. 0, 1, 2...). -At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written. -At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written. -In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as -a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service -is configured to listen to. - -For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number. -At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written. -At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written. -In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as -a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service -is configured to listen to. - -##### Decoding - -To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto -messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request, -the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into -the appropriate message type. - -The type of ABCI req/res, the block height, and the transaction index (where relevant) is known -from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message. - -##### Implementation example - -```go -type BaseApp struct { - - ... - - // abciListenersAsync for determining if abciListeners will run asynchronously. - // When abciListenersAsync=false and stopNodeOnABCIListenerErr=false listeners will run synchronized but will not stop the node. - // When abciListenersAsync=true stopNodeOnABCIListenerErr will be ignored. - abciListenersAsync bool - - // stopNodeOnABCIListenerErr halts the node when ABCI streaming service listening results in an error. - // stopNodeOnABCIListenerErr=true must be paired with abciListenersAsync=false. - stopNodeOnABCIListenerErr bool -} -``` - -#### ABCI Event Hooks - -We will modify the `FinalizeBlock` and `Commit` methods to pass ABCI requests and responses -to any streaming service hooks registered with the `BaseApp`. - -```go -func (app *BaseApp) FinalizeBlock(req abci.RequestFinalizeBlock) abci.ResponseFinalizeBlock { - - var abciRes abci.ResponseFinalizeBlock - defer func() { - // call the streaming service hook with the FinalizeBlock messages - for _, abciListener := range app.abciListeners { - ctx := app.finalizeState.ctx - blockHeight := ctx.BlockHeight() - if app.abciListenersAsync { - go func(req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) { - if err := app.abciListener.FinalizeBlock(blockHeight, req, res); err != nil { - app.logger.Error("FinalizeBlock listening hook failed", "height", blockHeight, "err", err) - } - }(req, abciRes) - } else { - if err := app.abciListener.ListenFinalizeBlock(blockHeight, req, res); err != nil { - app.logger.Error("FinalizeBlock listening hook failed", "height", blockHeight, "err", err) - if app.stopNodeOnABCIListenerErr { - os.Exit(1) - } - } - } - } - }() - - ... - - return abciRes -} -``` - -```go -func (app *BaseApp) Commit() abci.ResponseCommit { - - ... - - res := abci.ResponseCommit{ - Data: commitID.Hash, - RetainHeight: retainHeight, - } - - // call the streaming service hook with the Commit messages - for _, abciListener := range app.abciListeners { - ctx := app.deliverState.ctx - blockHeight := ctx.BlockHeight() - changeSet := app.cms.PopStateCache() - if app.abciListenersAsync { - go func(res abci.ResponseCommit, changeSet []store.StoreKVPair) { - if err := app.abciListener.ListenCommit(ctx, res, changeSet); err != nil { - app.logger.Error("ListenCommit listening hook failed", "height", blockHeight, "err", err) - } - }(res, changeSet) - } else { - if err := app.abciListener.ListenCommit(ctx, res, changeSet); err != nil { - app.logger.Error("ListenCommit listening hook failed", "height", blockHeight, "err", err) - if app.stopNodeOnABCIListenerErr { - os.Exit(1) - } - } - } - } - - ... - - return res -} -``` - -#### Go Plugin System - -We propose a plugin architecture to load and run `Streaming` plugins and other types of implementations. We will introduce a plugin -system over gRPC that is used to load and run Cosmos-SDK plugins. The plugin system uses [hashicorp/go-plugin](https://github.com/hashicorp/go-plugin). -Each plugin must have a struct that implements the `plugin.Plugin` interface and an `Impl` interface for processing messages over gRPC. -Each plugin must also have a message protocol defined for the gRPC service: - -```go -// streaming/plugins/abci/{plugin_version}/interface.go - -// Handshake is a common handshake that is shared by streaming and host. -// This prevents users from executing bad plugins or executing a plugin -// directory. It is a UX feature, not a security feature. -var Handshake = plugin.HandshakeConfig{ - ProtocolVersion: 1, - MagicCookieKey: "ABCI_LISTENER_PLUGIN", - MagicCookieValue: "ef78114d-7bdf-411c-868f-347c99a78345", + // Stream is the streaming service loop, awaits kv pairs and writes them to a destination stream or file + Stream(wg *sync.WaitGroup) error + // Listeners returns the streaming service's listeners for the BaseApp to register + Listeners() map[types.StoreKey][]store.WriteListener + // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp + ABCIListener + // Closer interface + io.Closer } -// ListenerPlugin is the base struct for all kinds of go-plugin implementations -// It will be included in interfaces of different Plugins -type ABCIListenerPlugin struct { - // GRPCPlugin must still implement the Plugin interface - plugin.Plugin - // Concrete implementation, written in Go. This is only used for plugins - // that are written in Go. - Impl baseapp.ABCIListener -} - -#### Auxiliary streaming service - -The `plugin.Plugin` interface has two methods `Client` and `Server`. For our GRPC service these are `GRPCClient` and `GRPCServer` -The `Impl` field holds the concrete implementation of our `baseapp.ABCIListener` interface written in Go. -Note: this is only used for plugin implementations written in Go. - -The advantage of having such a plugin system is that within each plugin authors can define the message protocol in a way that fits their use case. -For example, when state change listening is desired, the `ABCIListener` message protocol can be defined as below (*for illustrative purposes only*). -When state change listening is not desired than `ListenCommit` can be omitted from the protocol. - -```protobuf -syntax = "proto3"; - -... - -message Empty {} - -message ListenFinalizeBlockRequest { - RequestFinalizeBlock req = 1; - ResponseFinalizeBlock res = 2; -} -message ListenCommitRequest { - int64 block_height = 1; - ResponseCommit res = 2; - repeated StoreKVPair changeSet = 3; -} +#### BaseApp registration // plugin that listens to state changes service ABCIListenerService { @@ -442,15 +289,15 @@ service ABCIListenerService { ``` ```go -// SetStreamingService is used to register a streaming service with the BaseApp +// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore func (app *BaseApp) SetStreamingService(s StreamingService) { - // set the listeners for each StoreKey + // add the listeners for each StoreKey for key, lis := range s.Listeners() { app.cms.AddListeners(key, lis) } - // register the streaming service hooks within the BaseApp - // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks - app.hooks = append(app.hooks, s) + // register the StreamingService within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context + app.abciListeners = append(app.abciListeners, s) } ``` @@ -463,10 +310,14 @@ var ( _ baseapp.ABCIListener = (*GRPCClient)(nil) ) -// GRPCClient is an implementation of the ABCIListener and ABCIListenerPlugin interfaces that talks over RPC. -type GRPCClient struct { - client ABCIListenerServiceClient -} + defer func() { + // call the hooks with the BeginBlock messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil { + panic(sdkerrors.Wrapf(err, "BeginBlock listening hook failed, height: %d", req.Header.Height)) + } + } + }() func (m *GRPCClient) ListenFinalizeBlock(goCtx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error { ctx := sdk.UnwrapSDKContext(goCtx) @@ -486,130 +337,144 @@ type GRPCServer struct { Impl baseapp.ABCIListener } -func (m *GRPCServer) ListenFinalizeBlock(ctx context.Context, req *ListenFinalizeBlockRequest) (*Empty, error) { - return &Empty{}, m.Impl.ListenFinalizeBlock(ctx, req.Req, req.Res) -} + defer func() { + // Call the streaming service hooks with the EndBlock messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil { + panic(sdkerrors.Wrapf(err, "EndBlock listening hook failed, height: %d", req.Height)) + } + } + }() func (m *GRPCServer) ListenCommit(ctx context.Context, req *ListenCommitRequest) (*Empty, error) { return &Empty{}, m.Impl.ListenCommit(ctx, req.Res, req.ChangeSet) } -``` +```go +func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) { + + defer func() { + // call the hooks with the DeliverTx messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil { + panic(sdkerrors.Wrap(err, "DeliverTx listening hook failed")) + } + } + }() And the pre-compiled Go plugin `Impl`(*this is only used for plugins that are written in Go*): -```go -// streaming/plugins/abci/{plugin_version}/impl/plugin.go + return res +} +``` -// Plugins are pre-compiled and loaded by the plugin system +```golang +func (app *BaseApp) Commit() abci.ResponseCommit { + header := app.deliverState.ctx.BlockHeader() + retainHeight := app.GetBlockRetentionHeight(header.Height) + + // Write the DeliverTx state into branched storage and commit the MultiStore. + // The write to the DeliverTx state writes all state transitions to the root + // MultiStore (app.cms) so when Commit() is called is persists those values. + app.deliverState.ms.Write() + commitID := app.cms.Commit() + + res := abci.ResponseCommit{ + Data: commitID.Hash, + RetainHeight: retainHeight, + } -// ABCIListener is the implementation of the baseapp.ABCIListener interface -type ABCIListener struct{} + // call the hooks with the Commit message + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenCommit(app.deliverState.ctx, res); err != nil { + panic(sdkerrors.Wrapf(err, "Commit listening hook failed, height: %d", header.Height)) + } + } -func (m *ABCIListenerPlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error { - // send data to external system + app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID)) + ... } -func (m *ABCIListenerPlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []store.StoreKVPair) error { - // send data to external system -} +#### Error Handling And Async Consumers -func main() { - plugin.Serve(&plugin.ServeConfig{ - HandshakeConfig: grpc_abci_v1.Handshake, - Plugins: map[string]plugin.Plugin{ - "grpc_plugin_v1": &grpc_abci_v1.ABCIListenerGRPCPlugin{Impl: &ABCIListenerPlugin{}}, - }, +`ABCIListener`s are called synchronously inside the consensus state machine, the returned error causes panic which in turn halt the consensus state machine. The implementer should be careful not to break consensus unexpectedly or slow down it too much. -```toml -[store] - streamers = [ # if len(streamers) > 0 we are streaming - "file", - ] - -[streamers] - [streamers.file] - keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] - write_dir = "path to the write directory" - prefix = "optional prefix to prepend to the generated file names" +For some async use cases, one can spawn a go-routine internanlly to avoid slow down consensus state machine, and handle the errors internally and always returns `nil` to avoid halting consensus state machine on error. + +Furthermore, for most of the cases, we only need to use the builtin file streamer to listen to state changes directly inside cosmos-sdk, the other consumers should subscribe to the file streamer output externally. + +#### File Streamer + +We provide a minimal filesystem based implementation inside cosmos-sdk, and provides options to write output files reliably, the output files can be further consumed by external consumers, so most of the state listeners actually don't need to live inside the sdk and node, which improves the node robustness and simplify sdk internals. + +The file streamer can be wired in app like this: +```golang +exposeStoreKeys := ... // decide the key list to listen +service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec, logger) +bApp.SetStreamingService(service) ``` -We will introduce a plugin loading system that will return `(interface{}, error)`. -This provides the advantage of using versioned plugins where the plugin interface and gRPC protocol change over time. -In addition, it allows for building independent plugin that can expose different parts of the system over gRPC. +#### Plugin system -Each configured streamer will receive the +We propose a plugin architecture to load and run `StreamingService` implementations. We will introduce a plugin +loading/preloading system that is used to load, initialize, inject, run, and stop Cosmos-SDK plugins. Each plugin +must implement the following interface: ```go -// ServiceConstructor is used to construct a streaming service -type ServiceConstructor func(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) +// Plugin is the base interface for all kinds of cosmos-sdk plugins +// It will be included in interfaces of different Plugins +type Plugin interface { + // Name should return unique name of the plugin + Name() string -// ServiceType enum for specifying the type of StreamingService -type ServiceType int + // Version returns current version of the plugin + Version() string -const ( - Unknown ServiceType = iota - File - // add more in the future -) + // Init is called once when the Plugin is being loaded + // The plugin is passed the AppOptions for configuration + // A plugin will not necessarily have a functional Init + Init(env serverTypes.AppOptions) error -// NewStreamingServiceType returns the streaming.ServiceType corresponding to the provided name -func NewStreamingServiceType(name string) ServiceType { - switch strings.ToLower(name) { - case "file", "f": - return File - default: - return Unknown - } -} - -// String returns the string name of a streaming.ServiceType -func (sst ServiceType) String() string { - switch sst { - case File: - return "file" - default: - return "" - } -} - -// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors -var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{ - File: NewFileStreamingService, -} - -// ServiceTypeFromString returns the streaming.ServiceConstructor corresponding to the provided name -func ServiceTypeFromString(name string) (ServiceConstructor, error) { - ssType := NewStreamingServiceType(name) - if ssType == Unknown { - return nil, fmt.Errorf("unrecognized streaming service name %s", name) - } - if constructor, ok := ServiceConstructorLookupTable[ssType]; ok { - return constructor, nil - } - return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) -} - -// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService -func NewFileStreamingService(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) { - filePrefix := cast.ToString(opts.Get("streamers.file.prefix")) - fileDir := cast.ToString(opts.Get("streamers.file.write_dir")) - return file.NewStreamingService(fileDir, filePrefix, keys, marshaller) + // Closer interface for shutting down the plugin process + io.Closer } ``` -The `NewStreamingPlugin` and `RegisterStreamingPlugin` functions are used to register a plugin with the App's BaseApp. +The `Name` method returns a plugin's name. +The `Version` method returns a plugin's version. +The `Init` method initializes a plugin with the provided `AppOptions`. +The io.Closer is used to shut down the plugin service. + +For the purposes of this ADR we introduce a single kind of plugin- a state streaming plugin. +We will define a `StateStreamingPlugin` interface which extends the above `Plugin` interface to support a state streaming service. + +```go +// StateStreamingPlugin interface for plugins that load a baseapp.StreamingService onto a baseapp.BaseApp +type StateStreamingPlugin interface { + // Register configures and registers the plugin streaming service with the BaseApp + Register(bApp *baseapp.BaseApp, marshaller codec.BinaryCodec, keys map[string]*types.KVStoreKey) error + + // Start starts the background streaming process of the plugin streaming service + Start(wg *sync.WaitGroup) error + + // Plugin is the base Plugin interface + Plugin +} +``` + +The `Register` method is used during App construction to register the plugin's streaming service with an App's BaseApp using the BaseApp's `SetStreamingService` method. +The `Start` method is used during App construction to start the registered plugin streaming services and maintain synchronization with them. e.g. in `NewSimApp`: ```go func NewSimApp( - logger log.Logger, - db corestore.KVStoreWithBatch, - traceStore io.Writer, - loadLatest bool, - appOpts servertypes.AppOptions, - baseAppOptions ...func(*baseapp.BaseApp), + logger log.Logger, + db dbm.DB, + traceStore io.Writer, + loadLatest bool, + appOpts servertypes.AppOptions, + baseAppOptions ...func(*baseapp.BaseApp), ) *SimApp { ... @@ -640,48 +505,36 @@ func NewSimApp( } } - return app -``` - - // configure state listening capabilities using AppOptions - listeners := cast.ToStringSlice(appOpts.Get("store.streamers")) - for _, listenerName := range listeners { - // get the store keys allowed to be exposed for this streaming service - exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName))) - var exposeStoreKeys []sdk.StoreKey - if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys - exposeStoreKeys = make([]sdk.StoreKey, 0, len(keys)) - for _, storeKey := range keys { - exposeStoreKeys = append(exposeStoreKeys, storeKey) - } - } else { - exposeStoreKeys = make([]sdk.StoreKey, 0, len(exposeKeyStrs)) - for _, keyStr := range exposeKeyStrs { - if storeKey, ok := keys[keyStr]; ok { - exposeStoreKeys = append(exposeStoreKeys, storeKey) - } - } - } - if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything - continue - } - // get the constructor for this listener name - constructor, err := baseapp.NewStreamingServiceConstructor(listenerName) + keys := sdk.NewKVStoreKeys( + authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, + minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, + govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey, + evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, + ) + + pluginsOnKey := fmt.Sprintf("%s.%s", plugin.PLUGINS_TOML_KEY, plugin.PLUGINS_ON_TOML_KEY) + if cast.ToBool(appOpts.Get(pluginsOnKey)) { + // this loads the preloaded and any plugins found in `plugins.dir` + pluginLoader, err := loader.NewPluginLoader(appOpts, logger) if err != nil { - tmos.Exit(err.Error()) // or continue? + // handle error } - // generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose - streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) - if err != nil { - tmos.Exit(err.Error()) + + // initialize the loaded plugins + if err := pluginLoader.Initialize(); err != nil { + // handle error } - // register the streaming service with the BaseApp - bApp.RegisterStreamingService(streamingService) - // waitgroup and quit channel for optional shutdown coordination of the streaming service + + // register the plugin(s) with the BaseApp + if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil { + // handle error + } + + // start the plugin services, optionally use wg to synchronize shutdown using io.Closer wg := new(sync.WaitGroup) - quitChan := make(chan struct{})) - // kick off the background streaming service loop - streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead? + if err := pluginLoader.Start(wg); err != nil { + // handler error + } } The plugin system will be configured within an App's TOML configuration files. @@ -709,16 +562,52 @@ async = false stop-node-on-err = true ``` -There will be four parameters for configuring `ABCIListener` plugin: `streaming.abci.plugin`, `streaming.abci.keys`, `streaming.abci.async` and `streaming.abci.stop-node-on-err`. -`streaming.abci.plugin` is the name of the plugin we want to use for streaming, `streaming.abci.keys` is a set of store keys for stores it listens to, -`streaming.abci.async` is bool enabling asynchronous listening and `streaming.abci.stop-node-on-err` is a bool that stops the node when true and when operating -on synchronized mode `streaming.abci.async=false`. Note that `streaming.abci.stop-node-on-err=true` will be ignored if `streaming.abci.async=true`. -The configuration above support additional streaming plugins by adding the plugin to the `[streaming]` configuration section -and registering the plugin with `RegisterStreamingPlugin` helper function. +#### Configuration + +The plugin system will be configured within an app's app.toml file. + +```toml +[plugins] + on = false # turn the plugin system, as a whole, on or off + enabled = ["list", "of", "plugin", "names", "to", "enable"] + dir = "the directory to load non-preloaded plugins from; defaults to cosmos-sdk/plugin/plugins" +``` + +There will be three parameters for configuring the plugin system: `plugins.on`, `plugins.enabled` and `plugins.dir`. +`plugins.on` is a bool that turns on or off the plugin system at large, `plugins.dir` directs the system to a directory +to load plugins from, and `plugins.enabled` provides `opt-in` semantics to plugin names to enable (including preloaded plugins). + +Configuration of a given plugin is ultimately specific to the plugin, but we will introduce some standards here: + +Plugin TOML configuration should be split into separate sub-tables for each kind of plugin (e.g. `plugins.streaming`). + +Within these sub-tables, the parameters for a specific plugin of that kind are included in another sub-table (e.g. `plugins.streaming.file`). +It is generally expected, but not required, that a streaming service plugin can be configured with a set of store keys +(e.g. `plugins.streaming.file.keys`) for the stores it listens to and a flag (e.g. `plugins.streaming.file.halt_app_on_delivery_error`) +that signifies whether the service operates in a fire-and-forget capacity, or stop the BaseApp when an error occurs in +any of `ListenBeginBlock`, `ListenEndBlock` and `ListenDeliverTx`. -Note the that each plugin must include `streaming.{service}.plugin` property as it is a requirement for doing the lookup and registration of the plugin -with the App. All other properties are unique to the individual services. +e.g. + +```toml +[plugins] + on = false # turn the plugin system, as a whole, on or off + enabled = ["list", "of", "plugin", "names", "to", "enable"] + dir = "the directory to load non-preloaded plugins from; defaults to " + [plugins.streaming] # a mapping of plugin-specific streaming service parameters, mapped to their plugin name + [plugins.streaming.file] # the specific parameters for the file streaming service plugin + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" + halt_app_on_delivery_error = "false" # false == fire-and-forget; true == stop the application + [plugins.streaming.kafka] + keys = [] + topic_prefix = "block" # Optional prefix for topic names where data will be stored. + flush_timeout_ms = 5000 # Flush and wait for outstanding messages and requests to complete delivery when calling `StreamingService.Close(). (milliseconds) + halt_app_on_delivery_error = true # Whether or not to halt the application when plugin fails to deliver message(s). + ... +``` #### Encoding and decoding streams @@ -734,7 +623,7 @@ These changes will provide a means of subscribing to KVStore state changes in re ### Backwards Compatibility -* This ADR changes the `CommitMultiStore` interface, implementations supporting the previous version of this interface will not support the new one +* This ADR changes the `CommitMultiStore` interface, implementations supporting the previous version of these interfaces will not support the new ones ### Positive @@ -742,7 +631,7 @@ These changes will provide a means of subscribing to KVStore state changes in re ### Negative -* Changes `CommitMultiStore` interface and its implementations +* Changes `CommitMultiStore`interface ### Neutral diff --git a/go.mod b/go.mod index edf6e2807077..579ff92a6c66 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/hdevalence/ed25519consensus v0.0.0-20210204194344-59a8610d2b87 github.com/improbable-eng/grpc-web v0.14.1 - github.com/jhump/protoreflect v1.12.1-0.20220721211354-060cc04fc18b + github.com/jhump/protoreflect v1.13.1-0.20220928232736-101791cb1b4c github.com/magiconair/properties v1.8.6 github.com/mattn/go-isatty v0.0.16 github.com/otiai10/copy v1.6.0 diff --git a/server/config/config.go b/server/config/config.go index c46defbbb1f7..39957ada9283 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -148,27 +148,34 @@ type StateSyncConfig struct { SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"` } -// MempoolConfig defines the configurations for the SDK built-in app-side mempool -// implementations. -type MempoolConfig struct { - // MaxTxs defines the behavior of the mempool. A negative value indicates - // the mempool is disabled entirely, zero indicates that the mempool is - // unbounded in how many txs it may contain, and a positive value indicates - // the maximum amount of txs it may contain. - MaxTxs int `mapstructure:"max-txs"` -} - -// State Streaming configuration type ( - // StreamingConfig defines application configuration for external streaming services - StreamingConfig struct { - ABCI ABCIListenerConfig `mapstructure:"abci"` + // StoreConfig defines application configuration for state streaming and other + // storage related operations. + StoreConfig struct { + Streamers []string `mapstructure:"streamers"` + } + + // StreamersConfig defines concrete state streaming configuration options. These + // fields are required to be set when state streaming is enabled via a non-empty + // list defined by 'StoreConfig.Streamers'. + StreamersConfig struct { + File FileStreamerConfig `mapstructure:"file"` } - // ABCIListenerConfig defines application configuration for ABCIListener streaming service - ABCIListenerConfig struct { - Keys []string `mapstructure:"keys"` - Plugin string `mapstructure:"plugin"` - StopNodeOnErr bool `mapstructure:"stop-node-on-err"` + + // FileStreamerConfig defines the file streaming configuration options. + FileStreamerConfig struct { + Keys []string `mapstructure:"keys"` + WriteDir string `mapstructure:"write_dir"` + Prefix string `mapstructure:"prefix"` + // OutputMetadata specifies if output the block metadata file which includes + // the abci requests/responses, otherwise only the data file is outputted. + OutputMetadata bool `mapstructure:"output-metadata"` + // StopNodeOnError specifies if propagate the streamer errors to the consensus + // state machine, it's nesserary for data integrity of output. + StopNodeOnError bool `mapstructure:"stop-node-on-error"` + // Fsync specifies if calling fsync after writing the files, it slows down + // the commit, but don't lose data in face of system crash. + Fsync bool `mapstructure:"fsync"` } ) @@ -181,8 +188,8 @@ type Config struct { API APIConfig `mapstructure:"api"` GRPC GRPCConfig `mapstructure:"grpc"` StateSync StateSyncConfig `mapstructure:"state-sync"` - Streaming StreamingConfig `mapstructure:"streaming"` - Mempool MempoolConfig `mapstructure:"mempool"` + Store StoreConfig `mapstructure:"store"` + Streamers StreamersConfig `mapstructure:"streamers"` } // SetMinGasPrices sets the validator's minimum gas prices. @@ -241,92 +248,30 @@ func DefaultConfig() *Config { SnapshotInterval: 0, SnapshotKeepRecent: 2, }, - Streaming: StreamingConfig{ - ABCI: ABCIListenerConfig{ - Keys: []string{}, - StopNodeOnErr: true, - }, + Store: StoreConfig{ + Streamers: []string{}, }, - Mempool: MempoolConfig{ - MaxTxs: -1, + Streamers: StreamersConfig{ + File: FileStreamerConfig{ + Keys: []string{"*"}, + WriteDir: "data/file_streamer", + OutputMetadata: true, + StopNodeOnError: true, + // NOTICE: the default config don't protect the streamer data integrity + // in face of system crash. + Fsync: false, + }, }, } } // GetConfig returns a fully parsed Config object. func GetConfig(v *viper.Viper) (Config, error) { - globalLabelsRaw, ok := v.Get("telemetry.global-labels").([]interface{}) - if !ok { - return Config{}, fmt.Errorf("failed to parse global-labels config") - } - - globalLabels := make([][]string, 0, len(globalLabelsRaw)) - for idx, glr := range globalLabelsRaw { - labelsRaw, ok := glr.([]interface{}) - if !ok { - return Config{}, fmt.Errorf("failed to parse global label number %d from config", idx) - } - if len(labelsRaw) == 2 { - globalLabels = append(globalLabels, []string{labelsRaw[0].(string), labelsRaw[1].(string)}) - } + conf := DefaultConfig() + if err := v.Unmarshal(conf); err != nil { + return Config{}, fmt.Errorf("error extracting app config: %w", err) } - - return Config{ - BaseConfig: BaseConfig{ - MinGasPrices: v.GetString("minimum-gas-prices"), - InterBlockCache: v.GetBool("inter-block-cache"), - Pruning: v.GetString("pruning"), - PruningKeepRecent: v.GetString("pruning-keep-recent"), - PruningKeepEvery: v.GetString("pruning-keep-every"), - PruningInterval: v.GetString("pruning-interval"), - HaltHeight: v.GetUint64("halt-height"), - HaltTime: v.GetUint64("halt-time"), - IndexEvents: v.GetStringSlice("index-events"), - MinRetainBlocks: v.GetUint64("min-retain-blocks"), - IAVLCacheSize: v.GetUint64("iavl-cache-size"), - IAVLDisableFastNode: v.GetBool("iavl-disable-fastnode"), - }, - Telemetry: telemetry.Config{ - ServiceName: v.GetString("telemetry.service-name"), - Enabled: v.GetBool("telemetry.enabled"), - EnableHostname: v.GetBool("telemetry.enable-hostname"), - EnableHostnameLabel: v.GetBool("telemetry.enable-hostname-label"), - EnableServiceLabel: v.GetBool("telemetry.enable-service-label"), - PrometheusRetentionTime: v.GetInt64("telemetry.prometheus-retention-time"), - GlobalLabels: globalLabels, - }, - API: APIConfig{ - Enable: v.GetBool("api.enable"), - Swagger: v.GetBool("api.swagger"), - Address: v.GetString("api.address"), - MaxOpenConnections: v.GetUint("api.max-open-connections"), - RPCReadTimeout: v.GetUint("api.rpc-read-timeout"), - RPCWriteTimeout: v.GetUint("api.rpc-write-timeout"), - RPCMaxBodyBytes: v.GetUint("api.rpc-max-body-bytes"), - EnableUnsafeCORS: v.GetBool("api.enabled-unsafe-cors"), - }, - Rosetta: RosettaConfig{ - Enable: v.GetBool("rosetta.enable"), - Address: v.GetString("rosetta.address"), - Blockchain: v.GetString("rosetta.blockchain"), - Network: v.GetString("rosetta.network"), - Retries: v.GetInt("rosetta.retries"), - Offline: v.GetBool("rosetta.offline"), - }, - GRPC: GRPCConfig{ - Enable: v.GetBool("grpc.enable"), - Address: v.GetString("grpc.address"), - }, - GRPCWeb: GRPCWebConfig{ - Enable: v.GetBool("grpc-web.enable"), - Address: v.GetString("grpc-web.address"), - EnableUnsafeCORS: v.GetBool("grpc-web.enable-unsafe-cors"), - }, - StateSync: StateSyncConfig{ - SnapshotInterval: v.GetUint64("state-sync.snapshot-interval"), - SnapshotKeepRecent: v.GetUint32("state-sync.snapshot-keep-recent"), - }, - }, nil + return *conf, nil } // ValidateBasic returns an error if min-gas-prices field is empty in BaseConfig. Otherwise, it returns nil. diff --git a/server/config/toml.go b/server/config/toml.go index b22f2470883c..efa4189ef4d7 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -211,6 +211,26 @@ snapshot-interval = {{ .StateSync.SnapshotInterval }} # snapshot-keep-recent specifies the number of recent snapshots to keep and serve (0 to keep all). snapshot-keep-recent = {{ .StateSync.SnapshotKeepRecent }} + +############################################################################### +### Store / State Streaming ### +############################################################################### + +[store] +streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}] + +[streamers] +[streamers.file] +keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}] +write_dir = "{{ .Streamers.File.WriteDir }}" +prefix = "{{ .Streamers.File.Prefix }}" +# output-metadata specifies if output the metadata file which includes the abci request/responses +# during processing the block. +output-metadata = "{{ .Streamers.File.OutputMetadata }}" +# stop-node-on-error specifies if propagate the file streamer errors to consensus state machine. +stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}" +# fsync specifies if call fsync after writing the files. +fsync = "{{ .Streamers.File.Fsync }}" ` var configTemplate *template.Template diff --git a/simapp/app.go b/simapp/app.go index 8c6db0e27fbe..13205e437cc4 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -226,10 +226,10 @@ func NewSimApp( // not include this key. memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey") - // configure state listening capabilities using AppOptions - // we are doing nothing with the returned streamingServices and waitGroup in this case + // load state streaming if enabled if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil { - tmos.Exit(err.Error()) + fmt.Printf("failed to load state streaming: %s", err) + os.Exit(1) } app := &SimApp{ diff --git a/store/cachekv/store.go b/store/cachekv/store.go index 2d24027c6e7b..69f0baba6e32 100644 --- a/store/cachekv/store.go +++ b/store/cachekv/store.go @@ -9,7 +9,6 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/internal/conv" - "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index 14ed055aae4e..cf15b13d16d7 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -8,7 +8,6 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachekv" "github.com/cosmos/cosmos-sdk/store/dbadapter" - "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" ) @@ -42,9 +41,6 @@ func NewFromKVStore( store types.KVStore, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext, ) Store { - if listeners == nil { - listeners = make(map[types.StoreKey][]types.WriteListener) - } cms := Store{ db: cachekv.NewStore(store), stores: make(map[types.StoreKey]types.CacheWrap, len(stores)), @@ -61,9 +57,6 @@ func NewFromKVStore( store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx) } - if cms.ListeningEnabled(key) { - store = listenkv.NewStore(store.(types.KVStore), key, listeners[key]) - } cms.stores[key] = cachekv.NewStore(store.(types.KVStore)) } @@ -73,10 +66,10 @@ func NewFromKVStore( // NewStore creates a new Store object from a mapping of store keys to // CacheWrapper objects. Each CacheWrapper store is a branched store. func NewStore( - db corestore.KVStoreWithBatch, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, + db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext, ) Store { - return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext, listeners) + return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext) } func newCacheMultiStoreFromCMS(cms Store) Store { @@ -85,8 +78,7 @@ func newCacheMultiStoreFromCMS(cms Store) Store { stores[k] = v } - // don't pass listeners to nested cache store. - return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext, nil) + return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext) } // SetTracer sets the tracer for the MultiStore that the underlying @@ -117,11 +109,6 @@ func (cms Store) TracingEnabled() bool { return cms.traceWriter != nil } -// LatestVersion returns the branch version of the store -func (cms Store) LatestVersion() int64 { - panic("cannot get latest version from branch cached multi-store") -} - // GetStoreType returns the type of the store. func (cms Store) GetStoreType() types.StoreType { return types.StoreTypeMulti diff --git a/store/dbadapter/store.go b/store/dbadapter/store.go index ba673b98333d..fb6b7df1d8ac 100644 --- a/store/dbadapter/store.go +++ b/store/dbadapter/store.go @@ -3,10 +3,11 @@ package dbadapter import ( "io" - corestore "cosmossdk.io/core/store" - "cosmossdk.io/store/cachekv" - "cosmossdk.io/store/tracekv" - "cosmossdk.io/store/types" + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/tracekv" + "github.com/cosmos/cosmos-sdk/store/types" ) // Store is wrapper type for corestore.KVStoreWithBatch with implementation of KVStore @@ -85,5 +86,5 @@ func (dsa Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Ca return cachekv.NewStore(tracekv.NewStore(dsa, w, tc)) } -// corestore.KVStoreWithBatch implements KVStore so we can CacheKVStore it. +// dbm.DB implements KVStore so we can CacheKVStore it. var _ types.KVStore = Store{} diff --git a/store/iavl/store.go b/store/iavl/store.go index 7244530a21f4..a42be732d27e 100644 --- a/store/iavl/store.go +++ b/store/iavl/store.go @@ -14,7 +14,6 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/store/cachekv" - "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" @@ -210,7 +209,7 @@ func (st *Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Ca return cachekv.NewStore(tracekv.NewStore(st, w, tc)) } -// Set implements types.KVStore. +// Implements types.KVStore. func (st *Store) Set(key, value []byte) { types.AssertValidKey(key) types.AssertValidValue(value) diff --git a/store/iavl/store_test.go b/store/iavl/store_test.go index 4f1efefcf48b..e091b5ec4732 100644 --- a/store/iavl/store_test.go +++ b/store/iavl/store_test.go @@ -674,48 +674,3 @@ func TestCacheWraps(t *testing.T) { cacheWrappedWithTrace := store.CacheWrapWithTrace(nil, nil) require.IsType(t, &cachekv.Store{}, cacheWrappedWithTrace) } - -func TestChangeSets(t *testing.T) { - db := coretesting.NewMemDB() - treeSize := 1000 - treeVersion := int64(10) - targetVersion := int64(6) - tree := iavl.NewMutableTree(db, cacheSize, false, log.NewNopLogger(), iavl.FlushThresholdOption(math.MaxInt)) - - for j := int64(0); j < treeVersion; j++ { - keys := [][]byte{} - for i := 0; i < treeSize; i++ { - keys = append(keys, randBytes(4)) - } - sort.Slice(keys, func(p, q int) bool { - return bytes.Compare(keys[p], keys[q]) < 0 - }) - for i := 0; i < treeSize; i++ { - key := keys[i] - value := randBytes(50) - _, err := tree.Set(key, value) - require.NoError(t, err) - } - _, _, err := tree.SaveVersion() - require.NoError(t, err) - } - - changeSets := []*iavl.ChangeSet{} - iavlStore := UnsafeNewStore(tree) - commitID := iavlStore.LastCommitID() - - require.NoError(t, iavlStore.TraverseStateChanges(targetVersion+1, treeVersion, func(v int64, cs *iavl.ChangeSet) error { - changeSets = append(changeSets, cs) - return nil - })) - require.NoError(t, iavlStore.LoadVersionForOverwriting(targetVersion)) - - for i, cs := range changeSets { - v, err := tree.SaveChangeSet(cs) - require.NoError(t, err) - require.Equal(t, v, targetVersion+int64(i+1)) - } - - restoreCommitID := iavlStore.LastCommitID() - require.Equal(t, commitID, restoreCommitID) -} diff --git a/store/listenkv/store.go b/store/listenkv/store.go index b08a6e395071..a32b74c77f52 100644 --- a/store/listenkv/store.go +++ b/store/listenkv/store.go @@ -140,3 +140,10 @@ func (s *Store) CacheWrap() types.CacheWrap { func (s *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap { panic("cannot CacheWrapWithTrace a ListenKVStore") } + +// onWrite writes a KVStore operation to all of the WriteListeners +func (s *Store) onWrite(delete bool, key, value []byte) { + for _, l := range s.listeners { + l.OnWrite(s.parentStoreKey, key, value, delete) + } +} diff --git a/store/mem/store.go b/store/mem/store.go index df7cdc73b163..fe1b39991d3a 100644 --- a/store/mem/store.go +++ b/store/mem/store.go @@ -3,13 +3,12 @@ package mem import ( "io" - corestore "cosmossdk.io/core/store" - coretesting "cosmossdk.io/core/testing" - "cosmossdk.io/store/cachekv" - "cosmossdk.io/store/dbadapter" - pruningtypes "cosmossdk.io/store/pruning/types" - "cosmossdk.io/store/tracekv" - "cosmossdk.io/store/types" + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/tracekv" + "github.com/cosmos/cosmos-sdk/store/types" ) var ( diff --git a/store/prefix/store.go b/store/prefix/store.go index 26b8b0344a79..e0fbecb7b265 100644 --- a/store/prefix/store.go +++ b/store/prefix/store.go @@ -5,9 +5,9 @@ import ( "errors" "io" - "cosmossdk.io/store/cachekv" - "cosmossdk.io/store/tracekv" - "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/tracekv" + "github.com/cosmos/cosmos-sdk/store/types" ) var _ types.KVStore = Store{} @@ -57,7 +57,7 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach return cachekv.NewStore(tracekv.NewStore(s, w, tc)) } -// Get implements KVStore +// Implements KVStore func (s Store) Get(key []byte) []byte { res := s.parent.Get(s.key(key)) return res diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index c691258fec94..e134961b5ef2 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -556,7 +556,7 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { } stores[k] = store } - return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.getTracingContext(), rs.listeners) + return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.getTracingContext()) } // CacheMultiStoreWithVersion is analogous to CacheMultiStore except that it @@ -579,34 +579,9 @@ func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStor // version does not exist or is pruned, an error should be returned. var err error cacheStore, err = store.(*iavl.Store).GetImmutable(version) - // if we got error from loading a module store - // we fetch commit info of this version - // we use commit info to check if the store existed at this version or not if err != nil { - if commitInfo == nil { - var errCommitInfo error - commitInfo, errCommitInfo = rs.GetCommitInfo(version) - - if errCommitInfo != nil { - return nil, errCommitInfo - } - - for _, storeInfo := range commitInfo.StoreInfos { - storeInfos[storeInfo.Name] = true - } - } - - // If the store existed at this version, it means there's actually an error - // getting the root store at this version. - if storeInfos[key.Name()] { - return nil, err - } - - // If the store donesn't exist at this version, create a dummy one to prevent - // nil pointer panic in newer query APIs. - cacheStore = dbadapter.Store{DB: coretesting.NewMemDB()} + return nil, err } - default: cacheStore = store } @@ -620,7 +595,7 @@ func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStor cachedStores[key] = cacheStore } - return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.getTracingContext(), rs.listeners), nil + return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.getTracingContext()), nil } // GetStore returns a mounted Store for a given StoreKey. If the StoreKey does diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index 5a17d3c8f762..db17f788fee3 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -687,9 +687,6 @@ func TestCacheWraps(t *testing.T) { cacheWrappedWithTrace := multi.CacheWrapWithTrace(nil, nil) require.IsType(t, cachemulti.Store{}, cacheWrappedWithTrace) - - cacheWrappedWithListeners := multi.CacheWrapWithListeners(nil, nil) - require.IsType(t, cachemulti.Store{}, cacheWrappedWithListeners) } func TestTraceConcurrency(t *testing.T) { diff --git a/store/streaming/constructor.go b/store/streaming/constructor.go index e576f84b83d1..20cd9fe1ff8b 100644 --- a/store/streaming/constructor.go +++ b/store/streaming/constructor.go @@ -2,10 +2,13 @@ package streaming import ( "fmt" + "os" + "path" "strings" "sync" "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" serverTypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store/streaming/file" @@ -15,7 +18,7 @@ import ( ) // ServiceConstructor is used to construct a streaming service -type ServiceConstructor func(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error) +type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, codec.BinaryCodec) (baseapp.StreamingService, error) // ServiceType enum for specifying the type of StreamingService type ServiceType int @@ -26,7 +29,19 @@ const ( // add more in the future ) -// ServiceTypeFromString returns the streaming.ServiceType corresponding to the provided name +// Streaming option keys +const ( + OptStreamersFilePrefix = "streamers.file.prefix" + OptStreamersFileWriteDir = "streamers.file.write_dir" + OptStreamersFileOutputMetadata = "streamers.file.output-metadata" + OptStreamersFileStopNodeOnError = "streamers.file.stop-node-on-error" + OptStreamersFileFsync = "streamers.file.fsync" + + OptStoreStreamers = "store.streamers" +) + +// ServiceTypeFromString returns the streaming.ServiceType corresponding to the +// provided name. func ServiceTypeFromString(name string) ServiceType { switch strings.ToLower(name) { case "file", "f": @@ -63,16 +78,45 @@ func NewServiceConstructor(name string) (ServiceConstructor, error) { return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) } -// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService -func NewFileStreamingService(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error) { - filePrefix := cast.ToString(opts.Get("streamers.file.prefix")) - fileDir := cast.ToString(opts.Get("streamers.file.write_dir")) - return file.NewStreamingService(fileDir, filePrefix, keys, marshaller) +// NewFileStreamingService is the streaming.ServiceConstructor function for +// creating a FileStreamingService. +func NewFileStreamingService( + opts serverTypes.AppOptions, + keys []types.StoreKey, + marshaller codec.BinaryCodec, +) (baseapp.StreamingService, error) { + homePath := cast.ToString(opts.Get(flags.FlagHome)) + filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix)) + fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir)) + outputMetadata := cast.ToBool(opts.Get(OptStreamersFileOutputMetadata)) + stopNodeOnErr := cast.ToBool(opts.Get(OptStreamersFileStopNodeOnError)) + fsync := cast.ToBool(opts.Get(OptStreamersFileFsync)) + + // relative path is based on node home directory. + if !path.IsAbs(fileDir) { + fileDir = path.Join(homePath, fileDir) + } + + // try to create output directory if not exists. + if _, err := os.Stat(fileDir); os.IsNotExist(err) { + if err = os.MkdirAll(fileDir, os.ModePerm); err != nil { + return nil, err + } + } + + return file.NewStreamingService(fileDir, filePrefix, keys, marshaller, outputMetadata, stopNodeOnErr, fsync) } -// LoadStreamingServices is a function for loading StreamingServices onto the BaseApp using the provided AppOptions, codec, and keys -// It returns the WaitGroup and quit channel used to synchronize with the streaming services and any error that occurs during the setup -func LoadStreamingServices(bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions, appCodec codec.BinaryCodec, keys map[string]*types.KVStoreKey) ([]baseapp.StreamingService, *sync.WaitGroup, error) { +// LoadStreamingServices is a function for loading StreamingServices onto the +// BaseApp using the provided AppOptions, codec, and keys. It returns the +// WaitGroup and quit channel used to synchronize with the streaming services +// and any error that occurs during the setup. +func LoadStreamingServices( + bApp *baseapp.BaseApp, + appOpts serverTypes.AppOptions, + appCodec codec.BinaryCodec, + keys map[string]*types.KVStoreKey, +) ([]baseapp.StreamingService, *sync.WaitGroup, error) { // waitgroup and quit channel for optional shutdown coordination of the streaming service(s) wg := new(sync.WaitGroup) // configure state listening capabilities using AppOptions @@ -107,7 +151,9 @@ func LoadStreamingServices(bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions } return nil, nil, err } - // generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose + + // Generate the streaming service using the constructor, appOptions, and the + // StoreKeys we want to expose. streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) if err != nil { // close any services we may have already spun up before hitting the error on this one diff --git a/store/streaming/constructor_test.go b/store/streaming/constructor_test.go index 5f9d58016f68..f23fa499e827 100644 --- a/store/streaming/constructor_test.go +++ b/store/streaming/constructor_test.go @@ -1,20 +1,32 @@ -package streaming +package streaming_test import ( "testing" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/codec" codecTypes "github.com/cosmos/cosmos-sdk/codec/types" + serverTypes "github.com/cosmos/cosmos-sdk/server/types" + "github.com/cosmos/cosmos-sdk/simapp" + "github.com/cosmos/cosmos-sdk/store/streaming" "github.com/cosmos/cosmos-sdk/store/streaming/file" "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/tendermint/tendermint/libs/log" + dbm "github.com/tendermint/tm-db" "github.com/stretchr/testify/require" ) type fakeOptions struct{} -func (f *fakeOptions) Get(string) interface{} { return nil } +func (f *fakeOptions) Get(key string) interface{} { + if key == "streamers.file.write_dir" { + return "data/file_streamer" + + } + return nil +} var ( mockOptions = new(fakeOptions) @@ -24,12 +36,12 @@ var ( ) func TestStreamingServiceConstructor(t *testing.T) { - _, err := NewServiceConstructor("unexpectedName") + _, err := streaming.NewServiceConstructor("unexpectedName") require.NotNil(t, err) - constructor, err := NewServiceConstructor("file") + constructor, err := streaming.NewServiceConstructor("file") require.Nil(t, err) - var expectedType ServiceConstructor + var expectedType streaming.ServiceConstructor require.IsType(t, expectedType, constructor) serv, err := constructor(mockOptions, mockKeys, testMarshaller) @@ -41,3 +53,55 @@ func TestStreamingServiceConstructor(t *testing.T) { require.True(t, ok) } } + +func TestLoadStreamingServices(t *testing.T) { + db := dbm.NewMemDB() + encCdc := simapp.MakeTestEncodingConfig() + keys := sdk.NewKVStoreKeys("mockKey1", "mockKey2") + bApp := baseapp.NewBaseApp("appName", log.NewNopLogger(), db, nil) + + testCases := map[string]struct { + appOpts serverTypes.AppOptions + activeStreamersLen int + }{ + "empty app options": { + appOpts: simapp.EmptyAppOptions{}, + }, + "all StoreKeys exposed": { + appOpts: streamingAppOptions{keys: []string{"*"}}, + activeStreamersLen: 1, + }, + "some StoreKey exposed": { + appOpts: streamingAppOptions{keys: []string{"mockKey1"}}, + activeStreamersLen: 1, + }, + "not exposing anything": { + appOpts: streamingAppOptions{keys: []string{"mockKey3"}}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + activeStreamers, _, err := streaming.LoadStreamingServices(bApp, tc.appOpts, encCdc.Marshaler, keys) + require.NoError(t, err) + require.Equal(t, tc.activeStreamersLen, len(activeStreamers)) + }) + } +} + +type streamingAppOptions struct { + keys []string +} + +func (ao streamingAppOptions) Get(o string) interface{} { + switch o { + case "store.streamers": + return []string{"file"} + case "streamers.file.keys": + return ao.keys + case "streamers.file.write_dir": + return "data/file_streamer" + default: + return nil + } +} diff --git a/store/streaming/file/README.md b/store/streaming/file/README.md index 7243f15dd0bf..0c34de7f3bea 100644 --- a/store/streaming/file/README.md +++ b/store/streaming/file/README.md @@ -1,4 +1,5 @@ # File Streaming Service + This pkg contains an implementation of the [StreamingService](../../../baseapp/streaming.go) that writes the data stream out to files on the local filesystem. This process is performed synchronously with the message processing of the state machine. @@ -23,42 +24,68 @@ The `file.StreamingService` is configured from within an App using the `AppOptio We turn the service on by adding its name, "file", to `store.streamers`- the list of streaming services for this App to employ. In `streamers.file` we include three configuration parameters for the file streaming service: -1. `streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service. -In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off. + +1. `streamers.file.keys` contains the list of `StoreKey` names for the KVStores to expose using this service. + In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off. 2. `streamers.file.write_dir` contains the path to the directory to write the files to. 3. `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions -with other App `StreamingService` output files. - -##### Encoding - -For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number. -At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written. -At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written. -In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as -a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service -is configured to listen to. - -For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M -is the tx number in the block (i.e. 0, 1, 2...). -At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written. -At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written. -In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as -a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service -is configured to listen to. - -For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number. -At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written. -At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written. -In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as -a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service -is configured to listen to. - -##### Decoding - -To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto -messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request, -the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into -the appropriate message type. - -The type of ABCI req/res, the block height, and the transaction index (where relevant) is known -from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message. + with other App `StreamingService` output files. +4. `streamers.file.output-metadata` specifies if output the metadata file, otherwise only data file is outputted. +5. `streamers.file.stop-node-on-error` specifies if propagate the error to consensus state machine, it's nesserary for data integrity when node restarts. +6. `streamers.file.fsync` specifies if call fsync after writing the files, it's nesserary for data integrity when system crash, but slows down the commit time. + +### Encoding + +For each block, two files are created and names `block-{N}-meta` and `block-{N}-data`, where `N` is the block number. + +The meta file contains the protobuf encoded message `BlockMetadata` which contains the abci event requests and responses of the block: + +```protobuf +message BlockMetadata { + message DeliverTx { + tendermint.abci.RequestDeliverTx request = 1; + tendermint.abci.ResponseDeliverTx response = 2; + } + tendermint.abci.RequestBeginBlock request_begin_block = 1; + tendermint.abci.ResponseBeginBlock response_begin_block = 2; + repeated DeliverTx deliver_txs = 3; + tendermint.abci.RequestEndBlock request_end_block = 4; + tendermint.abci.ResponseEndBlock response_end_block = 5; + tendermint.abci.ResponseCommit response_commit = 6; +} +``` + +The data file contains a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores during the execution of block. + +Both meta and data files are prefixed with the length of the data content for consumer to detect completeness of the file, the length is encoded as 8 bytes with big endianness. + +The files are written at abci commit event, by default the error happens will be propagated to interuppted consensus state machine, but fsync is not called, it'll have good performance but have the risk of lossing data in face of rare event of system crash. + +### Decoding + +The pseudo-code for decoding is like this: + +```python +def decode_meta_file(file): + bz = file.read(8) + if len(bz) < 8: + raise "incomplete file exception" + size = int.from_bytes(bz, 'big') + + if file.size != size + 8: + raise "incomplete file exception" + + return decode_protobuf_message(BlockMetadata, file) + +def decode_data_file(file): + bz = file.read(8) + if len(bz) < 8: + raise "incomplete file exception" + size = int.from_bytes(bz, 'big') + + if file.size != size + 8: + raise "incomplete file exception" + + while not file.eof(): + yield decode_length_prefixed_protobuf_message(StoreKVStore, file) +``` diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go index 685ba7d9d770..c2dafe3d06e1 100644 --- a/store/streaming/file/service.go +++ b/store/streaming/file/service.go @@ -1,11 +1,13 @@ package file import ( - "errors" + "bytes" + "context" "fmt" + "io" "os" "path" - "path/filepath" + "sort" "sync" abci "github.com/tendermint/tendermint/abci/types" @@ -14,66 +16,54 @@ import ( "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) var _ baseapp.StreamingService = &StreamingService{} // StreamingService is a concrete implementation of StreamingService that writes state changes out to files type StreamingService struct { - listeners map[types.StoreKey][]types.WriteListener // the listeners that will be initialized with BaseApp - srcChan <-chan []byte // the channel that all the WriteListeners write their data out to - filePrefix string // optional prefix for each of the generated files - writeDir string // directory to write files into - codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files - stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received - stateCacheLock *sync.Mutex // mutex for the state cache - currentBlockNumber int64 // the current block number - currentTxIndex int64 // the index of the current tx - quitChan chan struct{} // channel to synchronize closure -} - -// IntermediateWriter is used so that we do not need to update the underlying io.Writer -// inside the StoreKVPairWriteListener everytime we begin writing to a new file -type IntermediateWriter struct { - outChan chan<- []byte -} - -// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel -func NewIntermediateWriter(outChan chan<- []byte) *IntermediateWriter { - return &IntermediateWriter{ - outChan: outChan, - } -} - -// Write satisfies io.Writer -func (iw *IntermediateWriter) Write(b []byte) (int, error) { - iw.outChan <- b - return len(b), nil + storeListeners []*types.MemoryListener // a series of KVStore listeners for each KVStore + filePrefix string // optional prefix for each of the generated files + writeDir string // directory to write files into + codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files + + currentBlockNumber int64 + blockMetadata types.BlockMetadata + // if write the metadata file, otherwise only data file is outputted. + outputMetadata bool + // if true, when commit failed it will panic and stop the consensus state machine to ensure the + // eventual consistency of the output, otherwise the error is ignored and have the risk of lossing data. + stopNodeOnErr bool + // if true, the file.Sync() is called to make sure the data is persisted onto disk, otherwise it risks lossing data when system crash. + fsync bool } // NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys -func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec) (*StreamingService, error) { - listenChan := make(chan []byte) - iw := NewIntermediateWriter(listenChan) - listener := types.NewStoreKVPairWriteListener(iw, c) - listeners := make(map[types.StoreKey][]types.WriteListener, len(storeKeys)) +func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) { + // sort storeKeys for deterministic output + sort.SliceStable(storeKeys, func(i, j int) bool { + return storeKeys[i].Name() < storeKeys[j].Name() + }) + + listeners := make([]*types.MemoryListener, len(storeKeys)) // in this case, we are using the same listener for each Store - for _, key := range storeKeys { - listeners[key] = append(listeners[key], listener) + for i, key := range storeKeys { + listeners[i] = types.NewMemoryListener(key) } - // check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not + // check that the writeDir exists and is writable so that we can catch the error here at initialization if it is not // we don't open a dstFile until we receive our first ABCI message if err := isDirWriteable(writeDir); err != nil { return nil, err } return &StreamingService{ - listeners: listeners, - srcChan: listenChan, + storeListeners: listeners, filePrefix: filePrefix, writeDir: writeDir, codec: c, - stateCache: make([][]byte, 0), - stateCacheLock: new(sync.Mutex), + outputMetadata: outputMetadata, + stopNodeOnErr: stopNodeOnErr, + fsync: fsync, }, nil } @@ -81,189 +71,105 @@ func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey // It returns the StreamingService's underlying WriteListeners // Use for registering the underlying WriteListeners with the BaseApp func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener { - return fss.listeners + listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.storeListeners)) + for _, listener := range fss.storeListeners { + listeners[listener.StoreKey()] = []types.WriteListener{listener} + } + return listeners } // ListenBeginBlock satisfies the baseapp.ABCIListener interface // It writes the received BeginBlock request and response and the resulting state changes // out to a file as described in the above the naming schema -func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { - // generate the new file - dstFile, err := fss.openBeginBlockFile(req) - if err != nil { - return err - } - // write req to file - lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) - if err != nil { - return err - } - if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { - return err - } - // write all state changes cached for this stage to file - fss.stateCacheLock.Lock() - for _, stateChange := range fss.stateCache { - if _, err = dstFile.Write(stateChange); err != nil { - fss.stateCache = nil - fss.stateCacheLock.Unlock() - return err - } - } - // reset cache - fss.stateCache = nil - fss.stateCacheLock.Unlock() - // write res to file - lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) - if err != nil { - return err - } - if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { - return err - } - // close file - return dstFile.Close() -} - -func (fss *StreamingService) openBeginBlockFile(req abci.RequestBeginBlock) (*os.File, error) { - fss.currentBlockNumber = req.GetHeader().Height - fss.currentTxIndex = 0 - fileName := fmt.Sprintf("block-%d-begin", fss.currentBlockNumber) - if fss.filePrefix != "" { - fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) - } - return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0o600) +func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) (rerr error) { + fss.blockMetadata.RequestBeginBlock = &req + fss.blockMetadata.ResponseBeginBlock = &res + fss.currentBlockNumber = req.Header.Height + return nil } // ListenDeliverTx satisfies the baseapp.ABCIListener interface // It writes the received DeliverTx request and response and the resulting state changes // out to a file as described in the above the naming schema -func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { - // generate the new file - dstFile, err := fss.openDeliverTxFile() - if err != nil { - return err - } - // write req to file - lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) +func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) (rerr error) { + fss.blockMetadata.DeliverTxs = append(fss.blockMetadata.DeliverTxs, &types.BlockMetadata_DeliverTx{ + Request: &req, + Response: &res, + }) + return nil +} + +// ListenEndBlock satisfies the baseapp.ABCIListener interface +// It writes the received EndBlock request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) (rerr error) { + fss.blockMetadata.RequestEndBlock = &req + fss.blockMetadata.ResponseEndBlock = &res + return nil +} + +// ListenEndBlock satisfies the baseapp.ABCIListener interface +func (fss *StreamingService) ListenCommit(ctx context.Context, res abci.ResponseCommit) error { + err := fss.doListenCommit(ctx, res) if err != nil { - return err - } - if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { - return err - } - // write all state changes cached for this stage to file - fss.stateCacheLock.Lock() - for _, stateChange := range fss.stateCache { - if _, err = dstFile.Write(stateChange); err != nil { - fss.stateCache = nil - fss.stateCacheLock.Unlock() + if fss.stopNodeOnErr { return err } } - // reset cache - fss.stateCache = nil - fss.stateCacheLock.Unlock() - // write res to file - lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) - if err != nil { - return err - } - if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { - return err - } - // close file - return dstFile.Close() + return nil } -func (fss *StreamingService) openDeliverTxFile() (*os.File, error) { - fileName := fmt.Sprintf("block-%d-tx-%d", fss.currentBlockNumber, fss.currentTxIndex) +func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.ResponseCommit) (err error) { + fss.blockMetadata.ResponseCommit = &res + + // write to target files, the file size is written at the beginning, which can be used to detect completeness. + metaFileName := fmt.Sprintf("block-%d-meta", fss.currentBlockNumber) + dataFileName := fmt.Sprintf("block-%d-data", fss.currentBlockNumber) if fss.filePrefix != "" { - fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) + metaFileName = fmt.Sprintf("%s-%s", fss.filePrefix, metaFileName) + dataFileName = fmt.Sprintf("%s-%s", fss.filePrefix, dataFileName) } - fss.currentTxIndex++ - return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0o600) -} -// ListenEndBlock satisfies the baseapp.ABCIListener interface -// It writes the received EndBlock request and response and the resulting state changes -// out to a file as described in the above the naming schema -func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { - // generate the new file - dstFile, err := fss.openEndBlockFile() - if err != nil { - return err - } - // write req to file - lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) - if err != nil { - return err - } - if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { - return err - } - // write all state changes cached for this stage to file - fss.stateCacheLock.Lock() - for _, stateChange := range fss.stateCache { - if _, err = dstFile.Write(stateChange); err != nil { - fss.stateCache = nil - fss.stateCacheLock.Unlock() + if fss.outputMetadata { + bz, err := fss.codec.Marshal(&fss.blockMetadata) + if err != nil { + return err + } + if err := writeLengthPrefixedFile(path.Join(fss.writeDir, metaFileName), bz, fss.fsync); err != nil { return err } } - // reset cache - fss.stateCache = nil - fss.stateCacheLock.Unlock() - // write res to file - lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) - if err != nil { - return err - } - if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { + + var buf bytes.Buffer + if err := fss.writeBlockData(&buf); err != nil { return err } - // close file - return dstFile.Close() + return writeLengthPrefixedFile(path.Join(fss.writeDir, dataFileName), buf.Bytes(), fss.fsync) } -func (fss *StreamingService) openEndBlockFile() (*os.File, error) { - fileName := fmt.Sprintf("block-%d-end", fss.currentBlockNumber) - if fss.filePrefix != "" { - fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) +func (fss *StreamingService) writeBlockData(writer io.Writer) error { + for _, listener := range fss.storeListeners { + cache := listener.PopStateCache() + for i := range cache { + bz, err := fss.codec.MarshalLengthPrefixed(&cache[i]) + if err != nil { + return err + } + if _, err = writer.Write(bz); err != nil { + return err + } + } } - return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0o600) + return nil } // Stream satisfies the baseapp.StreamingService interface -// It spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs -// and caches them in the order they were received -// returns an error if it is called twice func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { - if fss.quitChan != nil { - return errors.New("`Stream` has already been called. The stream needs to be closed before it can be started again") - } - fss.quitChan = make(chan struct{}) - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-fss.quitChan: - fss.quitChan = nil - return - case by := <-fss.srcChan: - fss.stateCacheLock.Lock() - fss.stateCache = append(fss.stateCache, by) - fss.stateCacheLock.Unlock() - } - } - }() return nil } // Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface func (fss *StreamingService) Close() error { - close(fss.quitChan) return nil } @@ -276,3 +182,32 @@ func isDirWriteable(dir string) error { } return os.Remove(f) } + +func writeLengthPrefixedFile(path string, data []byte, fsync bool) (err error) { + var f *os.File + f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600) + if err != nil { + return sdkerrors.Wrapf(err, "open file failed: %s", path) + } + defer func() { + // avoid overriding the real error with file close error + if err1 := f.Close(); err1 != nil && err == nil { + err = sdkerrors.Wrapf(err, "close file failed: %s", path) + } + }() + _, err = f.Write(sdk.Uint64ToBigEndian(uint64(len(data)))) + if err != nil { + return sdkerrors.Wrapf(err, "write length prefix failed: %s", path) + } + _, err = f.Write(data) + if err != nil { + return sdkerrors.Wrapf(err, "write block data failed: %s", path) + } + if fsync { + err = f.Sync() + if err != nil { + return sdkerrors.Wrapf(err, "fsync failed: %s", path) + } + } + return +} diff --git a/store/streaming/file/service_test.go b/store/streaming/file/service_test.go index 6ecad254b5ce..fe4e1bee2c26 100644 --- a/store/streaming/file/service_test.go +++ b/store/streaming/file/service_test.go @@ -2,20 +2,21 @@ package file import ( "encoding/binary" + "errors" "fmt" "os" "path/filepath" "sync" "testing" + "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" + types1 "github.com/tendermint/tendermint/proto/tendermint/types" + "github.com/cosmos/cosmos-sdk/codec" codecTypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" - - "github.com/stretchr/testify/require" - abci "github.com/tendermint/tendermint/abci/types" - types1 "github.com/tendermint/tendermint/proto/tendermint/types" ) var ( @@ -23,7 +24,8 @@ var ( testMarshaller = codec.NewProtoCodec(interfaceRegistry) testStreamingService *StreamingService testListener1, testListener2 types.WriteListener - emptyContext = sdk.Context{} + emptyContext = sdk.NewContext(nil, types1.Header{}, false, nil) + emptyContextWrap = sdk.WrapSDKContext(emptyContext) // test abci message types mockHash = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9} @@ -56,6 +58,10 @@ var ( ConsensusParamUpdates: &abci.ConsensusParams{}, ValidatorUpdates: []abci.ValidatorUpdate{}, } + testCommitRes = abci.ResponseCommit{ + Data: []byte{1}, + RetainHeight: 0, + } mockTxBytes1 = []byte{9, 8, 7, 6, 5, 4, 3, 2, 1} testDeliverTxReq1 = abci.RequestDeliverTx{ Tx: mockTxBytes1, @@ -104,25 +110,6 @@ var ( mockValue3 = []byte{5, 4, 3} ) -func TestIntermediateWriter(t *testing.T) { - outChan := make(chan []byte, 0) - iw := NewIntermediateWriter(outChan) - require.IsType(t, &IntermediateWriter{}, iw) - testBytes := []byte{1, 2, 3, 4, 5} - var length int - var err error - waitChan := make(chan struct{}, 0) - go func() { - length, err = iw.Write(testBytes) - waitChan <- struct{}{} - }() - receivedBytes := <-outChan - <-waitChan - require.Equal(t, len(testBytes), length) - require.Equal(t, testBytes, receivedBytes) - require.Nil(t, err) -} - func TestFileStreamingService(t *testing.T) { if os.Getenv("CI") != "" { t.Skip("Skipping TestFileStreamingService in CI environment") @@ -132,29 +119,23 @@ func TestFileStreamingService(t *testing.T) { defer os.RemoveAll(testDir) testKeys := []types.StoreKey{mockStoreKey1, mockStoreKey2} - testStreamingService, err = NewStreamingService(testDir, testPrefix, testKeys, testMarshaller) + testStreamingService, err = NewStreamingService(testDir, testPrefix, testKeys, testMarshaller, true, false, false) require.Nil(t, err) require.IsType(t, &StreamingService{}, testStreamingService) require.Equal(t, testPrefix, testStreamingService.filePrefix) require.Equal(t, testDir, testStreamingService.writeDir) require.Equal(t, testMarshaller, testStreamingService.codec) - testListener1 = testStreamingService.listeners[mockStoreKey1][0] - testListener2 = testStreamingService.listeners[mockStoreKey2][0] + testListener1 = testStreamingService.storeListeners[0] + testListener2 = testStreamingService.storeListeners[1] wg := new(sync.WaitGroup) testStreamingService.Stream(wg) - testListenBeginBlock(t) - testListenDeliverTx1(t) - testListenDeliverTx2(t) - testListenEndBlock(t) + testListenBlock(t) testStreamingService.Close() wg.Wait() } -func testListenBeginBlock(t *testing.T) { - expectedBeginBlockReqBytes, err := testMarshaller.Marshal(&testBeginBlockReq) - require.Nil(t, err) - expectedBeginBlockResBytes, err := testMarshaller.Marshal(&testBeginBlockRes) - require.Nil(t, err) +func testListenBlock(t *testing.T) { + var expectKVPairsStore1, expectKVPairsStore2 [][]byte // write state changes testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) @@ -183,162 +164,102 @@ func testListenBeginBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1, expectedKVPair3) + expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2) // send the ABCI messages - err = testStreamingService.ListenBeginBlock(emptyContext, testBeginBlockReq, testBeginBlockRes) - require.Nil(t, err) - - // load the file, checking that it was created with the expected name - fileName := fmt.Sprintf("%s-block-%d-begin", testPrefix, testBeginBlockReq.GetHeader().Height) - fileBytes, err := readInFile(fileName) - require.Nil(t, err) - - // segment the file into the separate gRPC messages and check the correctness of each - segments, err := segmentBytes(fileBytes) - require.Nil(t, err) - require.Equal(t, 5, len(segments)) - require.Equal(t, expectedBeginBlockReqBytes, segments[0]) - require.Equal(t, expectedKVPair1, segments[1]) - require.Equal(t, expectedKVPair2, segments[2]) - require.Equal(t, expectedKVPair3, segments[3]) - require.Equal(t, expectedBeginBlockResBytes, segments[4]) -} - -func testListenDeliverTx1(t *testing.T) { - expectedDeliverTxReq1Bytes, err := testMarshaller.Marshal(&testDeliverTxReq1) - require.Nil(t, err) - expectedDeliverTxRes1Bytes, err := testMarshaller.Marshal(&testDeliverTxRes1) + err = testStreamingService.ListenBeginBlock(emptyContextWrap, testBeginBlockReq, testBeginBlockRes) require.Nil(t, err) // write state changes testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) testListener2.OnWrite(mockStoreKey2, mockKey2, mockValue2, false) - testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) + testListener2.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) // expected KV pairs - expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + expectedKVPair1, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey1, Value: mockValue1, Delete: false, }) require.Nil(t, err) - expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey2, Value: mockValue2, Delete: false, }) require.Nil(t, err) - expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, Value: mockValue3, Delete: false, }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1) + expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2, expectedKVPair3) // send the ABCI messages - err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq1, testDeliverTxRes1) - require.Nil(t, err) - - // load the file, checking that it was created with the expected name - fileName := fmt.Sprintf("%s-block-%d-tx-%d", testPrefix, testBeginBlockReq.GetHeader().Height, 0) - fileBytes, err := readInFile(fileName) - require.Nil(t, err) - - // segment the file into the separate gRPC messages and check the correctness of each - segments, err := segmentBytes(fileBytes) - require.Nil(t, err) - require.Equal(t, 5, len(segments)) - require.Equal(t, expectedDeliverTxReq1Bytes, segments[0]) - require.Equal(t, expectedKVPair1, segments[1]) - require.Equal(t, expectedKVPair2, segments[2]) - require.Equal(t, expectedKVPair3, segments[3]) - require.Equal(t, expectedDeliverTxRes1Bytes, segments[4]) -} - -func testListenDeliverTx2(t *testing.T) { - expectedDeliverTxReq2Bytes, err := testMarshaller.Marshal(&testDeliverTxReq2) - require.Nil(t, err) - expectedDeliverTxRes2Bytes, err := testMarshaller.Marshal(&testDeliverTxRes2) + err = testStreamingService.ListenDeliverTx(emptyContextWrap, testDeliverTxReq1, testDeliverTxRes1) require.Nil(t, err) // write state changes - testListener1.OnWrite(mockStoreKey2, mockKey1, mockValue1, false) - testListener2.OnWrite(mockStoreKey1, mockKey2, mockValue2, false) - testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) + testListener2.OnWrite(mockStoreKey2, mockKey1, mockValue1, false) + testListener1.OnWrite(mockStoreKey1, mockKey2, mockValue2, false) + testListener2.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) // expected KV pairs - expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + expectedKVPair1, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey1, Value: mockValue1, Delete: false, }) require.Nil(t, err) - expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey2, Value: mockValue2, Delete: false, }) require.Nil(t, err) - expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, Value: mockValue3, Delete: false, }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair2) + expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair1, expectedKVPair3) // send the ABCI messages - err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq2, testDeliverTxRes2) - require.Nil(t, err) - - // load the file, checking that it was created with the expected name - fileName := fmt.Sprintf("%s-block-%d-tx-%d", testPrefix, testBeginBlockReq.GetHeader().Height, 1) - fileBytes, err := readInFile(fileName) - require.Nil(t, err) - - // segment the file into the separate gRPC messages and check the correctness of each - segments, err := segmentBytes(fileBytes) - require.Nil(t, err) - require.Equal(t, 5, len(segments)) - require.Equal(t, expectedDeliverTxReq2Bytes, segments[0]) - require.Equal(t, expectedKVPair1, segments[1]) - require.Equal(t, expectedKVPair2, segments[2]) - require.Equal(t, expectedKVPair3, segments[3]) - require.Equal(t, expectedDeliverTxRes2Bytes, segments[4]) -} - -func testListenEndBlock(t *testing.T) { - expectedEndBlockReqBytes, err := testMarshaller.Marshal(&testEndBlockReq) - require.Nil(t, err) - expectedEndBlockResBytes, err := testMarshaller.Marshal(&testEndBlockRes) + err = testStreamingService.ListenDeliverTx(emptyContextWrap, testDeliverTxReq2, testDeliverTxRes2) require.Nil(t, err) // write state changes testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) - testListener2.OnWrite(mockStoreKey1, mockKey2, mockValue2, false) - testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) + testListener1.OnWrite(mockStoreKey1, mockKey2, mockValue2, false) + testListener2.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) // expected KV pairs - expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + expectedKVPair1, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey1, Value: mockValue1, Delete: false, }) require.Nil(t, err) - expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey2, Value: mockValue2, Delete: false, }) require.Nil(t, err) - expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, Value: mockValue3, @@ -346,32 +267,60 @@ func testListenEndBlock(t *testing.T) { }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1, expectedKVPair2) + expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair3) + // send the ABCI messages - err = testStreamingService.ListenEndBlock(emptyContext, testEndBlockReq, testEndBlockRes) + err = testStreamingService.ListenEndBlock(emptyContextWrap, testEndBlockReq, testEndBlockRes) + require.Nil(t, err) + + err = testStreamingService.ListenCommit(emptyContextWrap, testCommitRes) require.Nil(t, err) // load the file, checking that it was created with the expected name - fileName := fmt.Sprintf("%s-block-%d-end", testPrefix, testEndBlockReq.Height) - fileBytes, err := readInFile(fileName) + metaFileName := fmt.Sprintf("%s-block-%d-meta", testPrefix, testBeginBlockReq.GetHeader().Height) + dataFileName := fmt.Sprintf("%s-block-%d-data", testPrefix, testBeginBlockReq.GetHeader().Height) + metaFileBytes, err := readInFile(metaFileName) + dataFileBytes, err := readInFile(dataFileName) + require.Nil(t, err) + + metadata := types.BlockMetadata{ + RequestBeginBlock: &testBeginBlockReq, + ResponseBeginBlock: &testBeginBlockRes, + RequestEndBlock: &testEndBlockReq, + ResponseEndBlock: &testEndBlockRes, + ResponseCommit: &testCommitRes, + DeliverTxs: []*types.BlockMetadata_DeliverTx{ + {Request: &testDeliverTxReq1, Response: &testDeliverTxRes1}, + {Request: &testDeliverTxReq2, Response: &testDeliverTxRes2}, + }, + } + expectedMetadataBytes, err := testMarshaller.Marshal(&metadata) require.Nil(t, err) + require.Equal(t, expectedMetadataBytes, metaFileBytes) // segment the file into the separate gRPC messages and check the correctness of each - segments, err := segmentBytes(fileBytes) + segments, err := segmentBytes(dataFileBytes) require.Nil(t, err) - require.Equal(t, 5, len(segments)) - require.Equal(t, expectedEndBlockReqBytes, segments[0]) - require.Equal(t, expectedKVPair1, segments[1]) - require.Equal(t, expectedKVPair2, segments[2]) - require.Equal(t, expectedKVPair3, segments[3]) - require.Equal(t, expectedEndBlockResBytes, segments[4]) + require.Equal(t, len(expectKVPairsStore1)+len(expectKVPairsStore2), len(segments)) + require.Equal(t, expectKVPairsStore1, segments[:len(expectKVPairsStore1)]) + require.Equal(t, expectKVPairsStore2, segments[len(expectKVPairsStore1):]) } func readInFile(name string) ([]byte, error) { path := filepath.Join(testDir, name) - return os.ReadFile(path) + bz, err := os.ReadFile(path) + if err != nil { + return nil, err + } + size := sdk.BigEndianToUint64(bz[:8]) + if len(bz) != int(size)+8 { + return nil, errors.New("incomplete file ") + } + return bz[8:], nil } -// Returns all of the protobuf messages contained in the byte array as an array of byte arrays +// segmentBytes returns all of the protobuf messages contained in the byte array as an array of byte arrays // The messages have their length prefix removed func segmentBytes(bz []byte) ([][]byte, error) { var err error @@ -387,7 +336,7 @@ func segmentBytes(bz []byte) ([][]byte, error) { return segments, nil } -// Returns the bytes for the leading protobuf object in the byte array (removing the length prefix) and returns the remainder of the byte array +// getHeadSegment returns the bytes for the leading protobuf object in the byte array (removing the length prefix) and returns the remainder of the byte array func getHeadSegment(bz []byte) ([]byte, []byte, error) { size, prefixSize := binary.Uvarint(bz) if prefixSize < 0 { diff --git a/store/types/listening.go b/store/types/listening.go index 75828793ffc1..fde18ea1c748 100644 --- a/store/types/listening.go +++ b/store/types/listening.go @@ -26,3 +26,37 @@ func (fl *MemoryListener) PopStateCache() []*StoreKVPair { fl.stateCache = nil return res } + +// MemoryListener listens to the state writes and accumulate the records in memory. +type MemoryListener struct { + key StoreKey + stateCache []StoreKVPair +} + +// NewMemoryListener creates a listener that accumulate the state writes in memory. +func NewMemoryListener(key StoreKey) *MemoryListener { + return &MemoryListener{key: key} +} + +// OnWrite implements WriteListener interface +func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error { + fl.stateCache = append(fl.stateCache, StoreKVPair{ + StoreKey: storeKey.Name(), + Delete: delete, + Key: key, + Value: value, + }) + return nil +} + +// PopStateCache returns the current state caches and set to nil +func (fl *MemoryListener) PopStateCache() []StoreKVPair { + res := fl.stateCache + fl.stateCache = nil + return res +} + +// StoreKey returns the storeKey it listens to +func (fl *MemoryListener) StoreKey() StoreKey { + return fl.key +} diff --git a/store/types/listening.pb.go b/store/types/listening.pb.go index 66b3769e12d3..d63e6730b124 100644 --- a/store/types/listening.pb.go +++ b/store/types/listening.pb.go @@ -5,9 +5,8 @@ package types import ( fmt "fmt" - v1 "github.com/cometbft/cometbft/api/cometbft/abci/v1" - _ "github.com/cosmos/cosmos-proto" - proto "github.com/cosmos/gogoproto/proto" + proto "github.com/gogo/protobuf/proto" + types "github.com/tendermint/tendermint/abci/types" io "io" math "math" math_bits "math/bits" @@ -100,16 +99,19 @@ func (m *StoreKVPair) GetValue() []byte { // BlockMetadata contains all the abci event data of a block // the file streamer dump them into files together with the state changes. type BlockMetadata struct { - ResponseCommit *v1.CommitResponse `protobuf:"bytes,6,opt,name=response_commit,json=responseCommit,proto3" json:"response_commit,omitempty"` - RequestFinalizeBlock *v1.FinalizeBlockRequest `protobuf:"bytes,7,opt,name=request_finalize_block,json=requestFinalizeBlock,proto3" json:"request_finalize_block,omitempty"` - ResponseFinalizeBlock *v1.FinalizeBlockResponse `protobuf:"bytes,8,opt,name=response_finalize_block,json=responseFinalizeBlock,proto3" json:"response_finalize_block,omitempty"` + RequestBeginBlock *types.RequestBeginBlock `protobuf:"bytes,1,opt,name=request_begin_block,json=requestBeginBlock,proto3" json:"request_begin_block,omitempty"` + ResponseBeginBlock *types.ResponseBeginBlock `protobuf:"bytes,2,opt,name=response_begin_block,json=responseBeginBlock,proto3" json:"response_begin_block,omitempty"` + DeliverTxs []*BlockMetadata_DeliverTx `protobuf:"bytes,3,rep,name=deliver_txs,json=deliverTxs,proto3" json:"deliver_txs,omitempty"` + RequestEndBlock *types.RequestEndBlock `protobuf:"bytes,4,opt,name=request_end_block,json=requestEndBlock,proto3" json:"request_end_block,omitempty"` + ResponseEndBlock *types.ResponseEndBlock `protobuf:"bytes,5,opt,name=response_end_block,json=responseEndBlock,proto3" json:"response_end_block,omitempty"` + ResponseCommit *types.ResponseCommit `protobuf:"bytes,6,opt,name=response_commit,json=responseCommit,proto3" json:"response_commit,omitempty"` } func (m *BlockMetadata) Reset() { *m = BlockMetadata{} } func (m *BlockMetadata) String() string { return proto.CompactTextString(m) } func (*BlockMetadata) ProtoMessage() {} func (*BlockMetadata) Descriptor() ([]byte, []int) { - return fileDescriptor_b6caeb9d7b7c7c10, []int{1} + return fileDescriptor_a5d350879fe4fecd, []int{1} } func (m *BlockMetadata) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -138,64 +140,143 @@ func (m *BlockMetadata) XXX_DiscardUnknown() { var xxx_messageInfo_BlockMetadata proto.InternalMessageInfo -func (m *BlockMetadata) GetResponseCommit() *v1.CommitResponse { +func (m *BlockMetadata) GetRequestBeginBlock() *types.RequestBeginBlock { + if m != nil { + return m.RequestBeginBlock + } + return nil +} + +func (m *BlockMetadata) GetResponseBeginBlock() *types.ResponseBeginBlock { + if m != nil { + return m.ResponseBeginBlock + } + return nil +} + +func (m *BlockMetadata) GetDeliverTxs() []*BlockMetadata_DeliverTx { + if m != nil { + return m.DeliverTxs + } + return nil +} + +func (m *BlockMetadata) GetRequestEndBlock() *types.RequestEndBlock { + if m != nil { + return m.RequestEndBlock + } + return nil +} + +func (m *BlockMetadata) GetResponseEndBlock() *types.ResponseEndBlock { + if m != nil { + return m.ResponseEndBlock + } + return nil +} + +func (m *BlockMetadata) GetResponseCommit() *types.ResponseCommit { if m != nil { return m.ResponseCommit } return nil } -func (m *BlockMetadata) GetRequestFinalizeBlock() *v1.FinalizeBlockRequest { +// DeliverTx encapulate deliver tx request and response. +type BlockMetadata_DeliverTx struct { + Request *types.RequestDeliverTx `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` + Response *types.ResponseDeliverTx `protobuf:"bytes,2,opt,name=response,proto3" json:"response,omitempty"` +} + +func (m *BlockMetadata_DeliverTx) Reset() { *m = BlockMetadata_DeliverTx{} } +func (m *BlockMetadata_DeliverTx) String() string { return proto.CompactTextString(m) } +func (*BlockMetadata_DeliverTx) ProtoMessage() {} +func (*BlockMetadata_DeliverTx) Descriptor() ([]byte, []int) { + return fileDescriptor_a5d350879fe4fecd, []int{1, 0} +} +func (m *BlockMetadata_DeliverTx) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *BlockMetadata_DeliverTx) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_BlockMetadata_DeliverTx.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *BlockMetadata_DeliverTx) XXX_Merge(src proto.Message) { + xxx_messageInfo_BlockMetadata_DeliverTx.Merge(m, src) +} +func (m *BlockMetadata_DeliverTx) XXX_Size() int { + return m.Size() +} +func (m *BlockMetadata_DeliverTx) XXX_DiscardUnknown() { + xxx_messageInfo_BlockMetadata_DeliverTx.DiscardUnknown(m) +} + +var xxx_messageInfo_BlockMetadata_DeliverTx proto.InternalMessageInfo + +func (m *BlockMetadata_DeliverTx) GetRequest() *types.RequestDeliverTx { if m != nil { - return m.RequestFinalizeBlock + return m.Request } return nil } -func (m *BlockMetadata) GetResponseFinalizeBlock() *v1.FinalizeBlockResponse { +func (m *BlockMetadata_DeliverTx) GetResponse() *types.ResponseDeliverTx { if m != nil { - return m.ResponseFinalizeBlock + return m.Response } return nil } func init() { - proto.RegisterType((*StoreKVPair)(nil), "cosmos.store.v1beta1.StoreKVPair") - proto.RegisterType((*BlockMetadata)(nil), "cosmos.store.v1beta1.BlockMetadata") + proto.RegisterType((*StoreKVPair)(nil), "cosmos.base.store.v1beta1.StoreKVPair") + proto.RegisterType((*BlockMetadata)(nil), "cosmos.base.store.v1beta1.BlockMetadata") + proto.RegisterType((*BlockMetadata_DeliverTx)(nil), "cosmos.base.store.v1beta1.BlockMetadata.DeliverTx") } func init() { - proto.RegisterFile("cosmos/store/v1beta1/listening.proto", fileDescriptor_b6caeb9d7b7c7c10) -} - -var fileDescriptor_b6caeb9d7b7c7c10 = []byte{ - // 416 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0x41, 0x6f, 0xd3, 0x30, - 0x14, 0xc7, 0xeb, 0xd6, 0x2d, 0x9e, 0x07, 0x2c, 0x32, 0x65, 0x84, 0x81, 0xa2, 0x68, 0x42, 0xd0, - 0xcb, 0x1c, 0xba, 0x71, 0xe2, 0x38, 0x24, 0x24, 0x32, 0x21, 0xa1, 0x20, 0x71, 0x40, 0x48, 0x91, - 0x93, 0xbe, 0x21, 0xab, 0x69, 0x5c, 0x62, 0x2f, 0x52, 0xb9, 0xf0, 0x15, 0xf8, 0x30, 0x48, 0x7c, - 0x05, 0x8e, 0x13, 0x27, 0x8e, 0xa8, 0xfd, 0x22, 0x28, 0xb6, 0x8b, 0x34, 0x38, 0xec, 0x96, 0xff, - 0x7b, 0xff, 0xff, 0xcf, 0xcf, 0xf1, 0xa3, 0x8f, 0x4a, 0xa5, 0x17, 0x4a, 0x27, 0xda, 0xa8, 0x06, - 0x92, 0x76, 0x5a, 0x80, 0x11, 0xd3, 0xa4, 0x92, 0xda, 0x40, 0x2d, 0xeb, 0x8f, 0x7c, 0xd9, 0x28, - 0xa3, 0xd8, 0xd8, 0xb9, 0xb8, 0x75, 0x71, 0xef, 0x3a, 0x78, 0x58, 0xaa, 0x05, 0x98, 0xe2, 0xdc, - 0x24, 0xa2, 0x28, 0x65, 0xd2, 0x4e, 0x13, 0xb3, 0x5a, 0x82, 0x76, 0x99, 0x83, 0xfb, 0x2e, 0x93, - 0x5b, 0x95, 0x78, 0x80, 0x15, 0x87, 0x5f, 0xe8, 0xee, 0xdb, 0x8e, 0x74, 0xf6, 0xee, 0x8d, 0x90, - 0x0d, 0x7b, 0x40, 0x77, 0x2c, 0x38, 0x9f, 0xc3, 0x2a, 0x44, 0x31, 0x9a, 0xec, 0x64, 0xc4, 0x16, - 0xce, 0x60, 0xc5, 0xf6, 0xe9, 0x68, 0x06, 0x15, 0x18, 0x08, 0xfb, 0x31, 0x9a, 0x90, 0xcc, 0x2b, - 0x16, 0xd0, 0x41, 0x67, 0x1f, 0xc4, 0x68, 0x72, 0x33, 0xeb, 0x3e, 0xd9, 0x98, 0x0e, 0x5b, 0x51, - 0x5d, 0x40, 0x88, 0x6d, 0xcd, 0x89, 0xe7, 0x77, 0x7e, 0x7e, 0x3b, 0xda, 0x73, 0xa7, 0x1f, 0xe9, - 0xd9, 0x3c, 0x7e, 0xca, 0x9f, 0x9d, 0x1c, 0x7e, 0xef, 0xd3, 0x5b, 0xa7, 0x95, 0x2a, 0xe7, 0xaf, - 0xc1, 0x88, 0x99, 0x30, 0x82, 0xbd, 0xa2, 0x7b, 0x0d, 0xe8, 0xa5, 0xaa, 0x35, 0xe4, 0xa5, 0x5a, - 0x2c, 0xa4, 0x09, 0x47, 0x31, 0x9a, 0xec, 0x1e, 0xc7, 0x7c, 0x7b, 0x4b, 0xde, 0xdd, 0x92, 0xb7, - 0x53, 0xfe, 0xc2, 0xf6, 0x33, 0x6f, 0xcf, 0x6e, 0x6f, 0x83, 0xae, 0xce, 0x3e, 0xd0, 0xfd, 0x06, - 0x3e, 0x5d, 0x80, 0x36, 0xf9, 0xb9, 0xac, 0x45, 0x25, 0x3f, 0x43, 0x5e, 0x74, 0x87, 0x85, 0x37, - 0x2c, 0xf1, 0xf1, 0xff, 0xc4, 0x97, 0xde, 0x67, 0x67, 0xca, 0x5c, 0x38, 0x1b, 0x7b, 0xca, 0x95, - 0x26, 0xcb, 0xe9, 0xbd, 0xbf, 0x83, 0xfe, 0x83, 0x27, 0x16, 0xff, 0xe4, 0x5a, 0xbc, 0x9f, 0xfb, - 0xee, 0x96, 0x73, 0xa5, 0x9d, 0x62, 0x82, 0x82, 0x7e, 0x8a, 0x49, 0x3f, 0x18, 0xa4, 0x98, 0x0c, - 0x02, 0x9c, 0x62, 0x82, 0x83, 0x61, 0x8a, 0xc9, 0x30, 0x18, 0x9d, 0x1e, 0xff, 0x58, 0x47, 0xe8, - 0x72, 0x1d, 0xa1, 0xdf, 0xeb, 0x08, 0x7d, 0xdd, 0x44, 0xbd, 0xcb, 0x4d, 0xd4, 0xfb, 0xb5, 0x89, - 0x7a, 0xef, 0x43, 0xf7, 0x93, 0xf5, 0x6c, 0xce, 0xa5, 0xf2, 0xfb, 0x64, 0xf7, 0xa1, 0x18, 0xd9, - 0x57, 0x3f, 0xf9, 0x13, 0x00, 0x00, 0xff, 0xff, 0x98, 0x9f, 0x12, 0x13, 0x6c, 0x02, 0x00, 0x00, + proto.RegisterFile("cosmos/base/store/v1beta1/listening.proto", fileDescriptor_a5d350879fe4fecd) +} + +var fileDescriptor_a5d350879fe4fecd = []byte{ + // 473 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x93, 0x4f, 0x6f, 0xd3, 0x40, + 0x10, 0xc5, 0xe3, 0xa6, 0x09, 0xc9, 0x04, 0x68, 0x59, 0x2a, 0x64, 0x5a, 0xc9, 0xb8, 0xe1, 0x62, + 0x0e, 0xac, 0xd5, 0x70, 0x44, 0xe2, 0x10, 0x40, 0x42, 0x2a, 0x08, 0xe4, 0x02, 0x07, 0x2e, 0x96, + 0xff, 0x8c, 0xc2, 0x12, 0xdb, 0x1b, 0x76, 0x37, 0x51, 0x73, 0xe6, 0xc2, 0x91, 0x8f, 0xc5, 0xb1, + 0x47, 0x8e, 0x28, 0xf9, 0x22, 0xc8, 0x6b, 0xc7, 0xa9, 0x43, 0x7d, 0xca, 0xee, 0xe4, 0xbd, 0x9f, + 0xdf, 0xcc, 0x6a, 0xe0, 0x49, 0xc4, 0x65, 0xca, 0xa5, 0x1b, 0x06, 0x12, 0x5d, 0xa9, 0xb8, 0x40, + 0x77, 0x71, 0x16, 0xa2, 0x0a, 0xce, 0xdc, 0x84, 0x49, 0x85, 0x19, 0xcb, 0x26, 0x74, 0x26, 0xb8, + 0xe2, 0xe4, 0x61, 0x21, 0xa5, 0xb9, 0x94, 0x6a, 0x29, 0x2d, 0xa5, 0xc7, 0x27, 0x0a, 0xb3, 0x18, + 0x45, 0xca, 0x32, 0xe5, 0x06, 0x61, 0xc4, 0x5c, 0xb5, 0x9c, 0xa1, 0x2c, 0x7c, 0xc3, 0x6f, 0x30, + 0xb8, 0xc8, 0xd5, 0xe7, 0x9f, 0x3f, 0x04, 0x4c, 0x90, 0x13, 0xe8, 0x6b, 0xb3, 0x3f, 0xc5, 0xa5, + 0x69, 0xd8, 0x86, 0xd3, 0xf7, 0x7a, 0xba, 0x70, 0x8e, 0x4b, 0xf2, 0x00, 0xba, 0x31, 0x26, 0xa8, + 0xd0, 0xdc, 0xb3, 0x0d, 0xa7, 0xe7, 0x95, 0x37, 0x72, 0x08, 0xed, 0x5c, 0xde, 0xb6, 0x0d, 0xe7, + 0xb6, 0x97, 0x1f, 0xc9, 0x11, 0x74, 0x16, 0x41, 0x32, 0x47, 0x73, 0x5f, 0xd7, 0x8a, 0xcb, 0xf0, + 0x47, 0x07, 0xee, 0x8c, 0x13, 0x1e, 0x4d, 0xdf, 0xa1, 0x0a, 0xe2, 0x40, 0x05, 0xc4, 0x83, 0xfb, + 0x02, 0xbf, 0xcf, 0x51, 0x2a, 0x3f, 0xc4, 0x09, 0xcb, 0xfc, 0x30, 0xff, 0x5b, 0x7f, 0x78, 0x30, + 0x1a, 0xd2, 0x6d, 0x70, 0x9a, 0x07, 0xa7, 0x5e, 0xa1, 0x1d, 0xe7, 0x52, 0x0d, 0xf2, 0xee, 0x89, + 0xdd, 0x12, 0xf9, 0x04, 0x47, 0x02, 0xe5, 0x8c, 0x67, 0x12, 0x6b, 0xd0, 0x3d, 0x0d, 0x7d, 0x7c, + 0x03, 0xb4, 0x10, 0x5f, 0xa3, 0x12, 0xf1, 0x5f, 0x8d, 0x5c, 0xc0, 0x20, 0xc6, 0x84, 0x2d, 0x50, + 0xf8, 0xea, 0x52, 0x9a, 0x6d, 0xbb, 0xed, 0x0c, 0x46, 0x23, 0xda, 0x38, 0x76, 0x5a, 0xeb, 0x94, + 0xbe, 0x2a, 0xbc, 0x1f, 0x2f, 0x3d, 0x88, 0x37, 0x47, 0x49, 0xde, 0xc2, 0xa6, 0x01, 0x1f, 0xb3, + 0xb8, 0x0c, 0xba, 0xaf, 0x83, 0xda, 0x4d, 0xdd, 0xbf, 0xce, 0xe2, 0x22, 0xe5, 0x81, 0xa8, 0x17, + 0xc8, 0x7b, 0xa8, 0x82, 0x5f, 0xc3, 0x75, 0x34, 0xee, 0xb4, 0xb1, 0xef, 0x8a, 0x77, 0x28, 0x76, + 0x2a, 0xe4, 0x0d, 0x1c, 0x54, 0xc0, 0x88, 0xa7, 0x29, 0x53, 0x66, 0x57, 0xd3, 0x1e, 0x35, 0xd2, + 0x5e, 0x6a, 0x99, 0x77, 0x57, 0xd4, 0xee, 0xc7, 0x3f, 0x0d, 0xe8, 0x57, 0x23, 0x20, 0xcf, 0xe1, + 0x56, 0x99, 0xbd, 0x7c, 0xea, 0xd3, 0xa6, 0x66, 0xb7, 0x63, 0xdb, 0x38, 0xc8, 0x0b, 0xe8, 0x6d, + 0xe0, 0xe5, 0x9b, 0x0e, 0x1b, 0xd3, 0x6c, 0xed, 0x95, 0x67, 0x3c, 0xfe, 0xbd, 0xb2, 0x8c, 0xab, + 0x95, 0x65, 0xfc, 0x5d, 0x59, 0xc6, 0xaf, 0xb5, 0xd5, 0xba, 0x5a, 0x5b, 0xad, 0x3f, 0x6b, 0xab, + 0xf5, 0xc5, 0x99, 0x30, 0xf5, 0x75, 0x1e, 0xd2, 0x88, 0xa7, 0x6e, 0xb9, 0x79, 0xc5, 0xcf, 0x53, + 0x19, 0x4f, 0xcb, 0xfd, 0xd3, 0xbb, 0x13, 0x76, 0xf5, 0xf2, 0x3c, 0xfb, 0x17, 0x00, 0x00, 0xff, + 0xff, 0x69, 0x5c, 0x8f, 0x23, 0xa1, 0x03, 0x00, 0x00, } func (m *StoreKVPair) Marshal() (dAtA []byte, err error) { @@ -272,9 +353,9 @@ func (m *BlockMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.ResponseFinalizeBlock != nil { + if m.ResponseCommit != nil { { - size, err := m.ResponseFinalizeBlock.MarshalToSizedBuffer(dAtA[:i]) + size, err := m.ResponseCommit.MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -282,11 +363,11 @@ func (m *BlockMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintListening(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x42 + dAtA[i] = 0x32 } - if m.RequestFinalizeBlock != nil { + if m.ResponseEndBlock != nil { { - size, err := m.RequestFinalizeBlock.MarshalToSizedBuffer(dAtA[:i]) + size, err := m.ResponseEndBlock.MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -294,11 +375,11 @@ func (m *BlockMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintListening(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x3a + dAtA[i] = 0x2a } - if m.ResponseCommit != nil { + if m.RequestEndBlock != nil { { - size, err := m.ResponseCommit.MarshalToSizedBuffer(dAtA[:i]) + size, err := m.RequestEndBlock.MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -306,7 +387,92 @@ func (m *BlockMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintListening(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x32 + dAtA[i] = 0x22 + } + if len(m.DeliverTxs) > 0 { + for iNdEx := len(m.DeliverTxs) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.DeliverTxs[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintListening(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + } + if m.ResponseBeginBlock != nil { + { + size, err := m.ResponseBeginBlock.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintListening(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.RequestBeginBlock != nil { + { + size, err := m.RequestBeginBlock.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintListening(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *BlockMetadata_DeliverTx) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *BlockMetadata_DeliverTx) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *BlockMetadata_DeliverTx) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Response != nil { + { + size, err := m.Response.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintListening(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Request != nil { + { + size, err := m.Request.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintListening(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa } return len(dAtA) - i, nil } @@ -352,16 +518,47 @@ func (m *BlockMetadata) Size() (n int) { } var l int _ = l + if m.RequestBeginBlock != nil { + l = m.RequestBeginBlock.Size() + n += 1 + l + sovListening(uint64(l)) + } + if m.ResponseBeginBlock != nil { + l = m.ResponseBeginBlock.Size() + n += 1 + l + sovListening(uint64(l)) + } + if len(m.DeliverTxs) > 0 { + for _, e := range m.DeliverTxs { + l = e.Size() + n += 1 + l + sovListening(uint64(l)) + } + } + if m.RequestEndBlock != nil { + l = m.RequestEndBlock.Size() + n += 1 + l + sovListening(uint64(l)) + } + if m.ResponseEndBlock != nil { + l = m.ResponseEndBlock.Size() + n += 1 + l + sovListening(uint64(l)) + } if m.ResponseCommit != nil { l = m.ResponseCommit.Size() n += 1 + l + sovListening(uint64(l)) } - if m.RequestFinalizeBlock != nil { - l = m.RequestFinalizeBlock.Size() + return n +} + +func (m *BlockMetadata_DeliverTx) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Request != nil { + l = m.Request.Size() n += 1 + l + sovListening(uint64(l)) } - if m.ResponseFinalizeBlock != nil { - l = m.ResponseFinalizeBlock.Size() + if m.Response != nil { + l = m.Response.Size() n += 1 + l + sovListening(uint64(l)) } return n @@ -572,6 +769,184 @@ func (m *BlockMetadata) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: BlockMetadata: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestBeginBlock", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthListening + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthListening + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.RequestBeginBlock == nil { + m.RequestBeginBlock = &types.RequestBeginBlock{} + } + if err := m.RequestBeginBlock.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ResponseBeginBlock", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthListening + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthListening + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ResponseBeginBlock == nil { + m.ResponseBeginBlock = &types.ResponseBeginBlock{} + } + if err := m.ResponseBeginBlock.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DeliverTxs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthListening + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthListening + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DeliverTxs = append(m.DeliverTxs, &BlockMetadata_DeliverTx{}) + if err := m.DeliverTxs[len(m.DeliverTxs)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestEndBlock", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthListening + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthListening + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.RequestEndBlock == nil { + m.RequestEndBlock = &types.RequestEndBlock{} + } + if err := m.RequestEndBlock.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ResponseEndBlock", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthListening + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthListening + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ResponseEndBlock == nil { + m.ResponseEndBlock = &types.ResponseEndBlock{} + } + if err := m.ResponseEndBlock.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex case 6: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field ResponseCommit", wireType) @@ -602,15 +977,65 @@ func (m *BlockMetadata) Unmarshal(dAtA []byte) error { return io.ErrUnexpectedEOF } if m.ResponseCommit == nil { - m.ResponseCommit = &v1.CommitResponse{} + m.ResponseCommit = &types.ResponseCommit{} } if err := m.ResponseCommit.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex - case 7: + default: + iNdEx = preIndex + skippy, err := skipListening(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthListening + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *BlockMetadata_DeliverTx) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DeliverTx: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DeliverTx: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field RequestFinalizeBlock", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Request", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -637,16 +1062,16 @@ func (m *BlockMetadata) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.RequestFinalizeBlock == nil { - m.RequestFinalizeBlock = &v1.FinalizeBlockRequest{} + if m.Request == nil { + m.Request = &types.RequestDeliverTx{} } - if err := m.RequestFinalizeBlock.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + if err := m.Request.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex - case 8: + case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field ResponseFinalizeBlock", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Response", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -673,10 +1098,10 @@ func (m *BlockMetadata) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.ResponseFinalizeBlock == nil { - m.ResponseFinalizeBlock = &v1.FinalizeBlockResponse{} + if m.Response == nil { + m.Response = &types.ResponseDeliverTx{} } - if err := m.ResponseFinalizeBlock.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + if err := m.Response.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex diff --git a/store/types/store.go b/store/types/store.go index 3ff430d76322..8b38c99290e3 100644 --- a/store/types/store.go +++ b/store/types/store.go @@ -159,9 +159,6 @@ type MultiStore interface { // implied that the caller should update the context when necessary between // tracing operations. The modified MultiStore is returned. SetTracingContext(TraceContext) MultiStore - - // LatestVersion returns the latest version in the store - LatestVersion() int64 } // CacheMultiStore is from MultiStore.CacheMultiStore().... @@ -222,6 +219,13 @@ type CommitMultiStore interface { // RollbackToVersion rollback the db to specific version(height). RollbackToVersion(version int64) error + + // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey + ListeningEnabled(key StoreKey) bool + + // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey + // It appends the listeners to a current set, if one already exists + AddListeners(key StoreKey, listeners []WriteListener) } //---------subsp-------------------------------