Skip to content

Commit

Permalink
fix: state listener observe writes at wrong time
Browse files Browse the repository at this point in the history
Closes: cosmos#13457 (again)
currently state listener only observe events at commit event,
it should listen to the second layer of cache store and this layer only,
and restructure the code to always use a second layer of cache store.

revise ADR-038 spec
  • Loading branch information
yihuang committed Oct 11, 2022
1 parent f3a558c commit b7a82d1
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 215 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (x/gov) [#13045](https://github.com/cosmos/cosmos-sdk/pull/13045) Fix gov migrations for v3(0.46).
* (snapshot) [#13400](https://github.com/cosmos/cosmos-sdk/pull/13400) Fix snapshot checksum issue in golang 1.19.
* (store) [#13459](https://github.com/cosmos/cosmos-sdk/pull/13459) Don't let state listener observe the uncommitted writes.
* (store) [#13476](https://github.com/cosmos/cosmos-sdk/pull/13476) fix state listener observe writes at wrong time.

### Deprecated

Expand Down
28 changes: 21 additions & 7 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,25 @@ func (app *BaseApp) InitChain(req abci.RequestInitChain) (res abci.ResponseInitC
app.setDeliverState(initHeader)
app.setCheckState(initHeader)

// add block gas meter for any genesis transactions (allow infinite gas)
app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(sdk.NewInfiniteGasMeter())

// wrap in a cache context for state listening to work.
cacheCtx, write := app.deliverState.ctx.CacheContextWithListeners(app.writeListeners)

// Store the consensus params in the BaseApp's paramstore. Note, this must be
// done after the deliver state and context have been set as it's persisted
// to state.
if req.ConsensusParams != nil {
app.StoreConsensusParams(app.deliverState.ctx, req.ConsensusParams)
app.StoreConsensusParams(cacheCtx, req.ConsensusParams)
}

if app.initChainer == nil {
return
}

// add block gas meter for any genesis transactions (allow infinite gas)
app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(sdk.NewInfiniteGasMeter())

res = app.initChainer(app.deliverState.ctx, req)
res = app.initChainer(cacheCtx, req)
write()

// sanity check
if len(req.Validators) > 0 {
Expand Down Expand Up @@ -191,7 +195,12 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
}

if app.beginBlocker != nil {
res = app.beginBlocker(app.deliverState.ctx, req)
// wrap in a cache context for state listening to work.
cacheCtx, write := app.deliverState.ctx.CacheContextWithListeners(app.writeListeners)

res = app.beginBlocker(cacheCtx, req)

write()
res.Events = sdk.MarkEventsToIndex(res.Events, app.indexEvents)
}
// set the signed validators for addition to context in deliverTx
Expand All @@ -214,7 +223,12 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
}

if app.endBlocker != nil {
res = app.endBlocker(app.deliverState.ctx, req)
// wrap in a cache context for state listening to work.
cacheCtx, write := app.deliverState.ctx.CacheContextWithListeners(app.writeListeners)

res = app.endBlocker(cacheCtx, req)

write()
res.Events = sdk.MarkEventsToIndex(res.Events, app.indexEvents)
}

Expand Down
23 changes: 20 additions & 3 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ type BaseApp struct { //nolint: maligned
// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener

writeListeners map[storetypes.StoreKey][]storetypes.WriteListener
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand All @@ -153,6 +155,7 @@ func NewBaseApp(
msgServiceRouter: NewMsgServiceRouter(),
txDecoder: txDecoder,
fauxMerkleMode: false,
writeListeners: make(map[storetypes.StoreKey][]storetypes.WriteListener),
}

for _, option := range options {
Expand Down Expand Up @@ -533,7 +536,7 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, sdk.CacheMultiStore) {
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte, mode runTxMode) (sdk.Context, sdk.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
msCache := ms.CacheMultiStore()
Expand All @@ -547,6 +550,11 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
).(sdk.CacheMultiStore)
}

// enable state listeners in deliver mode
if mode == runTxModeDeliver {
msCache = msCache.SetListeners(app.writeListeners)
}

return ctx.WithMultiStore(msCache), msCache
}

Expand Down Expand Up @@ -624,7 +632,7 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
// NOTE: Alternatively, we could require that AnteHandler ensures that
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
anteCtx, msCache = app.cacheTxContext(ctx, txBytes, mode)
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
newCtx, err := app.anteHandler(anteCtx, tx, mode == runTxModeSimulate)

Expand Down Expand Up @@ -663,7 +671,7 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
// Create a new Context based off of the existing Context with a MultiStore branch
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes)
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes, mode)

// Attempt to execute all messages and only update state if all messages pass
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
Expand Down Expand Up @@ -779,3 +787,12 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, mode runTxMode) (*s
func makeABCIData(msgResponses []*codectypes.Any) ([]byte, error) {
return proto.Marshal(&sdk.TxMsgData{MsgResponses: msgResponses})
}

// AddListeners registers listeners for a specific KVStore
func (app *BaseApp) AddListeners(key storetypes.StoreKey, listeners []storetypes.WriteListener) {
if ls, ok := app.writeListeners[key]; ok {
app.writeListeners[key] = append(ls, listeners...)
} else {
app.writeListeners[key] = listeners
}
}
3 changes: 2 additions & 1 deletion baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
// register the listeners on app, which will be passed to cache store when necessary.
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
app.AddListeners(key, lis)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
Expand Down
82 changes: 50 additions & 32 deletions docs/architecture/adr-038-state-listening.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Changelog

* 10/11/2022: Explicitly enable listeners on specific cache stores to prevent duplicated observations on nested cache stores.
* 11/23/2020: Initial draft

## Status
Expand All @@ -20,12 +21,12 @@ In addition to these request/response queries, it would be beneficial to have a

## Decision

We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores.
We will modify the `CacheMultiStore` interface and its concrete (`cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores, the listeners are notified when the `CacheMultiStore` flush the dirty writes to the underlying store in the `Write()`.
We will introduce a plugin system for configuring and running streaming services that write these state changes and their surrounding ABCI message context to different destinations.

### Listening interface

In a new file, `store/types/listening.go`, we will create a `WriteListener` interface for streaming out state changes from a KVStore.
In a new file, `store/types/listening.go`, we will create a `WriteListener` interface for streaming out state changes from a `KVStore`.

```go
// WriteListener interface for streaming data out from a listenkv.Store
Expand Down Expand Up @@ -137,19 +138,19 @@ func (s *Store) onWrite(delete bool, key, value []byte) {

### MultiStore interface updates

We will update the `MultiStore` interface to allow us to wrap a set of listeners around a specific `KVStore`.
We will update the `CacheMultiStore` interface to allow us to wrap a set of listeners around a specific `KVStore`.
Additionally, we will update the `CacheWrap` and `CacheWrapper` interfaces to enable listening in the caching layer.

```go
type MultiStore interface {
type CacheMultiStore interface {
...

// ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey
ListeningEnabled(key StoreKey) bool

// AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey
// It appends the listeners to a current set, if one already exists
AddListeners(key StoreKey, listeners []WriteListener)
// SetListeners set the state listeners for the KVStores.
// It override existing ones and re-wire the kv stores.
SetListeners(key StoreKey, listeners []WriteListener)
}
```

Expand All @@ -169,37 +170,54 @@ type CacheWrapper interface {
}
```

### MultiStore implementation updates
### CacheMultiStore implementation updates

We will modify all of the `Store` and `MultiStore` implementations to satisfy these new interfaces, and adjust the `rootmulti` `GetKVStore` method
to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on for that `Store`.
We will modify all of the `Store` and `CacheMultiStore` implementations to satisfy these new interfaces.

```go
func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
store := rs.stores[key].(types.KVStore)
### Context changes

if rs.TracingEnabled() {
store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext)
}
if rs.ListeningEnabled(key) {
store = listenkv.NewStore(key, store, rs.listeners[key])
}
We add a new method `CacheContextWithListeners` to `sdk.Context` to keep it convenient to enable listeners on the cache store.

return store
}
```
### When to listen

We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to forward the listeners
to and enable listening in the cache layer.
cosmos-sdk use multiple layers of cache stores internally, we should only listen to one of them to avoid duplicated notifications, the most inner layer is the `app.deliverState.ctx` which writes to `rootmulti` at commit event, to be able to segment the writes with the ABCI events and transactions, we should listener to the second layer. We also need to wrap the consensus state machine logic in a second layer of cache store with listeners enabled, for example:

```go
func (rs *Store) CacheMultiStore() types.CacheMultiStore {
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range rs.stores {
stores[k] = v
}
return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners)
}
```
InitChain:
// branch out from cms, inner cache layer, without listeners
setDeliverState()
// second cache layer, with listeners
cacheCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners)
initChainer(cacheCtx)
// listeners are notified here
write()
BeginBlocker:
setDeliverState()
cacheCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners)
beginBlockers(cacheCtx)
write()
DeliverTx:
anteCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners)
anteHandler(anteCtx)
write()
runMsgsCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners)
runMsgs(runMsgsCtx)
write()
EndBlocker:
cacheCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners)
endBlocker(cacheCtx)
write()
Commit:
deliverState.ms.Write()
app.cms.Commit()
```

### Exposing the data
Expand Down
2 changes: 1 addition & 1 deletion server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (ms multiStore) SetTracer(w io.Writer) sdk.MultiStore {
panic("not implemented")
}

func (ms multiStore) AddListeners(key storetypes.StoreKey, listeners []storetypes.WriteListener) {
func (ms multiStore) SetListeners(listeners map[storetypes.StoreKey][]storetypes.WriteListener) storetypes.MultiStore {
panic("not implemented")
}

Expand Down
Loading

0 comments on commit b7a82d1

Please sign in to comment.