diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c26381a0b3b..d3a5ddaba064 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -171,6 +171,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (x/gov) [#13045](https://github.com/cosmos/cosmos-sdk/pull/13045) Fix gov migrations for v3(0.46). * (snapshot) [#13400](https://github.com/cosmos/cosmos-sdk/pull/13400) Fix snapshot checksum issue in golang 1.19. * (store) [#13459](https://github.com/cosmos/cosmos-sdk/pull/13459) Don't let state listener observe the uncommitted writes. +* (store) [#13476](https://github.com/cosmos/cosmos-sdk/pull/13476) fix state listener observe writes at wrong time. ### Deprecated diff --git a/baseapp/abci.go b/baseapp/abci.go index 4f52136fac10..c3f45763ee22 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -53,21 +53,25 @@ func (app *BaseApp) InitChain(req abci.RequestInitChain) (res abci.ResponseInitC app.setDeliverState(initHeader) app.setCheckState(initHeader) + // add block gas meter for any genesis transactions (allow infinite gas) + app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(sdk.NewInfiniteGasMeter()) + + // wrap in a cache context for state listening to work. + cacheCtx, write := app.deliverState.ctx.CacheContextWithListeners(app.writeListeners) + // Store the consensus params in the BaseApp's paramstore. Note, this must be // done after the deliver state and context have been set as it's persisted // to state. if req.ConsensusParams != nil { - app.StoreConsensusParams(app.deliverState.ctx, req.ConsensusParams) + app.StoreConsensusParams(cacheCtx, req.ConsensusParams) } if app.initChainer == nil { return } - // add block gas meter for any genesis transactions (allow infinite gas) - app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(sdk.NewInfiniteGasMeter()) - - res = app.initChainer(app.deliverState.ctx, req) + res = app.initChainer(cacheCtx, req) + write() // sanity check if len(req.Validators) > 0 { @@ -191,7 +195,12 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg } if app.beginBlocker != nil { - res = app.beginBlocker(app.deliverState.ctx, req) + // wrap in a cache context for state listening to work. + cacheCtx, write := app.deliverState.ctx.CacheContextWithListeners(app.writeListeners) + + res = app.beginBlocker(cacheCtx, req) + + write() res.Events = sdk.MarkEventsToIndex(res.Events, app.indexEvents) } // set the signed validators for addition to context in deliverTx @@ -214,7 +223,12 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc } if app.endBlocker != nil { - res = app.endBlocker(app.deliverState.ctx, req) + // wrap in a cache context for state listening to work. + cacheCtx, write := app.deliverState.ctx.CacheContextWithListeners(app.writeListeners) + + res = app.endBlocker(cacheCtx, req) + + write() res.Events = sdk.MarkEventsToIndex(res.Events, app.indexEvents) } diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index b9618ca989b2..a26430b34a04 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -133,6 +133,8 @@ type BaseApp struct { //nolint: maligned // abciListeners for hooking into the ABCI message processing of the BaseApp // and exposing the requests and responses to external consumers abciListeners []ABCIListener + + writeListeners map[storetypes.StoreKey][]storetypes.WriteListener } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a @@ -153,6 +155,7 @@ func NewBaseApp( msgServiceRouter: NewMsgServiceRouter(), txDecoder: txDecoder, fauxMerkleMode: false, + writeListeners: make(map[storetypes.StoreKey][]storetypes.WriteListener), } for _, option := range options { @@ -533,7 +536,7 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context // cacheTxContext returns a new context based off of the provided context with // a branched multi-store. -func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, sdk.CacheMultiStore) { +func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte, mode runTxMode) (sdk.Context, sdk.CacheMultiStore) { ms := ctx.MultiStore() // TODO: https://github.com/cosmos/cosmos-sdk/issues/2824 msCache := ms.CacheMultiStore() @@ -547,6 +550,11 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context ).(sdk.CacheMultiStore) } + // enable state listeners in deliver mode + if mode == runTxModeDeliver { + msCache = msCache.SetListeners(app.writeListeners) + } + return ctx.WithMultiStore(msCache), msCache } @@ -624,7 +632,7 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re // NOTE: Alternatively, we could require that AnteHandler ensures that // writes do not happen if aborted/failed. This may have some // performance benefits, but it'll be more difficult to get right. - anteCtx, msCache = app.cacheTxContext(ctx, txBytes) + anteCtx, msCache = app.cacheTxContext(ctx, txBytes, mode) anteCtx = anteCtx.WithEventManager(sdk.NewEventManager()) newCtx, err := app.anteHandler(anteCtx, tx, mode == runTxModeSimulate) @@ -663,7 +671,7 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re // Create a new Context based off of the existing Context with a MultiStore branch // in case message processing fails. At this point, the MultiStore // is a branch of a branch. - runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes) + runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes, mode) // Attempt to execute all messages and only update state if all messages pass // and we're in DeliverTx. Note, runMsgs will never return a reference to a @@ -779,3 +787,12 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, mode runTxMode) (*s func makeABCIData(msgResponses []*codectypes.Any) ([]byte, error) { return proto.Marshal(&sdk.TxMsgData{MsgResponses: msgResponses}) } + +// AddListeners registers listeners for a specific KVStore +func (app *BaseApp) AddListeners(key storetypes.StoreKey, listeners []storetypes.WriteListener) { + if ls, ok := app.writeListeners[key]; ok { + app.writeListeners[key] = append(ls, listeners...) + } else { + app.writeListeners[key] = listeners + } +} diff --git a/baseapp/options.go b/baseapp/options.go index f9a67f186d7c..e1b454b451d6 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -228,8 +228,9 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) { // SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore func (app *BaseApp) SetStreamingService(s StreamingService) { // add the listeners for each StoreKey + // register the listeners on app, which will be passed to cache store when necessary. for key, lis := range s.Listeners() { - app.cms.AddListeners(key, lis) + app.AddListeners(key, lis) } // register the StreamingService within the BaseApp // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 88fca0315518..229b8229a097 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -2,6 +2,7 @@ ## Changelog +* 10/11/2022: Explicitly enable listeners on specific cache stores to prevent duplicated observations on nested cache stores. * 11/23/2020: Initial draft ## Status @@ -20,12 +21,12 @@ In addition to these request/response queries, it would be beneficial to have a ## Decision -We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores. +We will modify the `CacheMultiStore` interface and its concrete (`cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores, the listeners are notified when the `CacheMultiStore` flush the dirty writes to the underlying store in the `Write()`. We will introduce a plugin system for configuring and running streaming services that write these state changes and their surrounding ABCI message context to different destinations. ### Listening interface -In a new file, `store/types/listening.go`, we will create a `WriteListener` interface for streaming out state changes from a KVStore. +In a new file, `store/types/listening.go`, we will create a `WriteListener` interface for streaming out state changes from a `KVStore`. ```go // WriteListener interface for streaming data out from a listenkv.Store @@ -137,19 +138,19 @@ func (s *Store) onWrite(delete bool, key, value []byte) { ### MultiStore interface updates -We will update the `MultiStore` interface to allow us to wrap a set of listeners around a specific `KVStore`. +We will update the `CacheMultiStore` interface to allow us to wrap a set of listeners around a specific `KVStore`. Additionally, we will update the `CacheWrap` and `CacheWrapper` interfaces to enable listening in the caching layer. ```go -type MultiStore interface { +type CacheMultiStore interface { ... // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey ListeningEnabled(key StoreKey) bool - // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey - // It appends the listeners to a current set, if one already exists - AddListeners(key StoreKey, listeners []WriteListener) + // SetListeners set the state listeners for the KVStores. + // It override existing ones and re-wire the kv stores. + SetListeners(key StoreKey, listeners []WriteListener) } ``` @@ -169,37 +170,54 @@ type CacheWrapper interface { } ``` -### MultiStore implementation updates +### CacheMultiStore implementation updates -We will modify all of the `Store` and `MultiStore` implementations to satisfy these new interfaces, and adjust the `rootmulti` `GetKVStore` method -to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on for that `Store`. +We will modify all of the `Store` and `CacheMultiStore` implementations to satisfy these new interfaces. -```go -func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { - store := rs.stores[key].(types.KVStore) +### Context changes - if rs.TracingEnabled() { - store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext) - } - if rs.ListeningEnabled(key) { - store = listenkv.NewStore(key, store, rs.listeners[key]) - } +We add a new method `CacheContextWithListeners` to `sdk.Context` to keep it convenient to enable listeners on the cache store. - return store -} -``` +### When to listen -We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to forward the listeners -to and enable listening in the cache layer. +cosmos-sdk use multiple layers of cache stores internally, we should only listen to one of them to avoid duplicated notifications, the most inner layer is the `app.deliverState.ctx` which writes to `rootmulti` at commit event, to be able to segment the writes with the ABCI events and transactions, we should listener to the second layer. We also need to wrap the consensus state machine logic in a second layer of cache store with listeners enabled, for example: -```go -func (rs *Store) CacheMultiStore() types.CacheMultiStore { - stores := make(map[types.StoreKey]types.CacheWrapper) - for k, v := range rs.stores { - stores[k] = v - } - return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners) -} +``` +InitChain: + // branch out from cms, inner cache layer, without listeners + setDeliverState() + + // second cache layer, with listeners + cacheCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners) + initChainer(cacheCtx) + + // listeners are notified here + write() + +BeginBlocker: + setDeliverState() + + cacheCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners) + beginBlockers(cacheCtx) + write() + +DeliverTx: + anteCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners) + anteHandler(anteCtx) + write() + + runMsgsCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners) + runMsgs(runMsgsCtx) + write() + +EndBlocker: + cacheCtx, write := deliverState.ctx.CacheContextWithListeners(app.writeListeners) + endBlocker(cacheCtx) + write() + +Commit: + deliverState.ms.Write() + app.cms.Commit() ``` ### Exposing the data diff --git a/server/mock/store.go b/server/mock/store.go index 51d44ed9d215..a9ed601694df 100644 --- a/server/mock/store.go +++ b/server/mock/store.go @@ -50,7 +50,7 @@ func (ms multiStore) SetTracer(w io.Writer) sdk.MultiStore { panic("not implemented") } -func (ms multiStore) AddListeners(key storetypes.StoreKey, listeners []storetypes.WriteListener) { +func (ms multiStore) SetListeners(listeners map[storetypes.StoreKey][]storetypes.WriteListener) storetypes.MultiStore { panic("not implemented") } diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index deb1d46272dd..7c4f928584f3 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -25,9 +25,10 @@ const storeNameCtxKey = "store_name" // NOTE: a Store (and MultiStores in general) should never expose the // keys for the substores. type Store struct { - db types.CacheKVStore - stores map[types.StoreKey]types.CacheWrap - keys map[string]types.StoreKey + db types.CacheKVStore + originalStores map[types.StoreKey]types.CacheWrapper + stores map[types.StoreKey]types.CacheWrap + keys map[string]types.StoreKey traceWriter io.Writer traceContext types.TraceContext @@ -43,34 +44,16 @@ var _ types.CacheMultiStore = Store{} func NewFromKVStore( store types.KVStore, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext, - listeners map[types.StoreKey][]types.WriteListener, ) Store { - if listeners == nil { - listeners = make(map[types.StoreKey][]types.WriteListener) - } cms := Store{ - db: cachekv.NewStore(store), - stores: make(map[types.StoreKey]types.CacheWrap, len(stores)), - keys: keys, - traceWriter: traceWriter, - traceContext: traceContext, - listeners: listeners, - } - - for key, store := range stores { - if cms.TracingEnabled() { - tctx := cms.traceContext.Clone().Merge(types.TraceContext{ - storeNameCtxKey: key.Name(), - }) - - store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx) - } - if cms.ListeningEnabled(key) { - store = listenkv.NewStore(store.(types.KVStore), key, listeners[key]) - } - cms.stores[key] = cachekv.NewStore(store.(types.KVStore)) + db: cachekv.NewStore(store), + originalStores: stores, + keys: keys, + traceWriter: traceWriter, + traceContext: traceContext, } + cms.stores = cms.recreateStores() return cms } @@ -78,9 +61,9 @@ func NewFromKVStore( // CacheWrapper objects. Each CacheWrapper store is a branched store. func NewStore( db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, - traceWriter io.Writer, traceContext types.TraceContext, listeners map[types.StoreKey][]types.WriteListener, + traceWriter io.Writer, traceContext types.TraceContext, ) Store { - return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext, listeners) + return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext) } func newCacheMultiStoreFromCMS(cms Store) Store { @@ -89,8 +72,7 @@ func newCacheMultiStoreFromCMS(cms Store) Store { stores[k] = v } - // don't pass listeners to nested cache store. - return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext, nil) + return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext) } // SetTracer sets the tracer for the MultiStore that the underlying @@ -121,17 +103,37 @@ func (cms Store) TracingEnabled() bool { return cms.traceWriter != nil } -// AddListeners adds listeners for a specific KVStore -func (cms Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) { - if ls, ok := cms.listeners[key]; ok { - cms.listeners[key] = append(ls, listeners...) - } else { - cms.listeners[key] = listeners +// SetListeners reset all the state listeners and re-wire the kv stores. +func (cms Store) SetListeners(listeners map[types.StoreKey][]types.WriteListener) types.CacheMultiStore { + cms.listeners = listeners + cms.stores = cms.recreateStores() + return cms +} + +// recreateStores recreate the wrapping stores when configuration changed. +func (cms Store) recreateStores() (stores map[types.StoreKey]types.CacheWrap) { + stores = make(map[types.StoreKey]types.CacheWrap, len(cms.originalStores)) + for key, store := range cms.originalStores { + if cms.TracingEnabled() { + tctx := cms.traceContext.Clone().Merge(types.TraceContext{ + storeNameCtxKey: key.Name(), + }) + + store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx) + } + if cms.ListeningEnabled(key) { + store = listenkv.NewStore(store.(types.KVStore), key, cms.listeners[key]) + } + stores[key] = cachekv.NewStore(store.(types.KVStore)) } + return } // ListeningEnabled returns if listening is enabled for a specific KVStore func (cms Store) ListeningEnabled(key types.StoreKey) bool { + if cms.listeners == nil { + return false + } if ls, ok := cms.listeners[key]; ok { return len(ls) != 0 } diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index 0f652d24bab5..c1b441f01212 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -22,7 +22,6 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/dbadapter" "github.com/cosmos/cosmos-sdk/store/iavl" - "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/mem" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/transient" @@ -71,8 +70,6 @@ type Store struct { traceContextMutex sync.Mutex interBlockCache types.MultiStorePersistentCache - - listeners map[types.StoreKey][]types.WriteListener } var ( @@ -93,7 +90,6 @@ func NewStore(db dbm.DB, logger log.Logger) *Store { storesParams: make(map[types.StoreKey]storeParams), stores: make(map[types.StoreKey]types.CommitKVStore), keysByName: make(map[string]types.StoreKey), - listeners: make(map[types.StoreKey][]types.WriteListener), removalMap: make(map[types.StoreKey]bool), pruningManager: pruning.NewManager(db, logger), } @@ -390,23 +386,6 @@ func (rs *Store) TracingEnabled() bool { return rs.traceWriter != nil } -// AddListeners adds listeners for a specific KVStore -func (rs *Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) { - if ls, ok := rs.listeners[key]; ok { - rs.listeners[key] = append(ls, listeners...) - } else { - rs.listeners[key] = listeners - } -} - -// ListeningEnabled returns if listening is enabled for a specific KVStore -func (rs *Store) ListeningEnabled(key types.StoreKey) bool { - if ls, ok := rs.listeners[key]; ok { - return len(ls) != 0 - } - return false -} - // LastCommitID implements Committer/CommitStore. func (rs *Store) LastCommitID() types.CommitID { if rs.lastCommitInfo == nil { @@ -481,7 +460,7 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { for k, v := range rs.stores { stores[k] = v } - return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.getTracingContext(), rs.listeners) + return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.getTracingContext()) } // CacheMultiStoreWithVersion is analogous to CacheMultiStore except that it @@ -511,7 +490,7 @@ func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStor } } - return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.getTracingContext(), rs.listeners), nil + return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.getTracingContext()), nil } // GetStore returns a mounted Store for a given StoreKey. If the StoreKey does @@ -545,9 +524,6 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { if rs.TracingEnabled() { store = tracekv.NewStore(store, rs.traceWriter, rs.getTracingContext()) } - if rs.ListeningEnabled(key) { - store = listenkv.NewStore(store, key, rs.listeners[key]) - } return store } diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index a219593fbb86..73f213a76f41 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -17,7 +17,6 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/iavl" sdkmaps "github.com/cosmos/cosmos-sdk/store/internal/maps" - "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) @@ -625,27 +624,6 @@ func TestSetInitialVersion(t *testing.T) { require.True(t, iavlStore.VersionExists(5)) } -func TestAddListenersAndListeningEnabled(t *testing.T) { - db := dbm.NewMemDB() - multi := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) - testKey := types.NewKVStoreKey("listening_test_key") - enabled := multi.ListeningEnabled(testKey) - require.False(t, enabled) - - multi.AddListeners(testKey, []types.WriteListener{}) - enabled = multi.ListeningEnabled(testKey) - require.False(t, enabled) - - mockListener := types.NewStoreKVPairWriteListener(nil, nil) - multi.AddListeners(testKey, []types.WriteListener{mockListener}) - wrongTestKey := types.NewKVStoreKey("wrong_listening_test_key") - enabled = multi.ListeningEnabled(wrongTestKey) - require.False(t, enabled) - - enabled = multi.ListeningEnabled(testKey) - require.True(t, enabled) -} - var ( interfaceRegistry = codecTypes.NewInterfaceRegistry() testMarshaller = codec.NewProtoCodec(interfaceRegistry) @@ -655,83 +633,6 @@ var ( testValue2 = []byte{6, 5, 4, 3, 2} ) -func TestGetListenWrappedKVStore(t *testing.T) { - buf := new(bytes.Buffer) - var db dbm.DB = dbm.NewMemDB() - ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) - ms.LoadLatestVersion() - mockListeners := []types.WriteListener{types.NewStoreKVPairWriteListener(buf, testMarshaller)} - ms.AddListeners(testStoreKey1, mockListeners) - ms.AddListeners(testStoreKey2, mockListeners) - - listenWrappedStore1 := ms.GetKVStore(testStoreKey1) - require.IsType(t, &listenkv.Store{}, listenWrappedStore1) - - listenWrappedStore1.Set(testKey1, testValue1) - expectedOutputKVPairSet1, err := testMarshaller.MarshalLengthPrefixed(&types.StoreKVPair{ - Key: testKey1, - Value: testValue1, - StoreKey: testStoreKey1.Name(), - Delete: false, - }) - require.Nil(t, err) - kvPairSet1Bytes := buf.Bytes() - buf.Reset() - require.Equal(t, expectedOutputKVPairSet1, kvPairSet1Bytes) - - listenWrappedStore1.Delete(testKey1) - expectedOutputKVPairDelete1, err := testMarshaller.MarshalLengthPrefixed(&types.StoreKVPair{ - Key: testKey1, - Value: nil, - StoreKey: testStoreKey1.Name(), - Delete: true, - }) - require.Nil(t, err) - kvPairDelete1Bytes := buf.Bytes() - buf.Reset() - require.Equal(t, expectedOutputKVPairDelete1, kvPairDelete1Bytes) - - listenWrappedStore2 := ms.GetKVStore(testStoreKey2) - require.IsType(t, &listenkv.Store{}, listenWrappedStore2) - - listenWrappedStore2.Set(testKey2, testValue2) - expectedOutputKVPairSet2, err := testMarshaller.MarshalLengthPrefixed(&types.StoreKVPair{ - Key: testKey2, - Value: testValue2, - StoreKey: testStoreKey2.Name(), - Delete: false, - }) - require.NoError(t, err) - kvPairSet2Bytes := buf.Bytes() - buf.Reset() - require.Equal(t, expectedOutputKVPairSet2, kvPairSet2Bytes) - - listenWrappedStore2.Delete(testKey2) - expectedOutputKVPairDelete2, err := testMarshaller.MarshalLengthPrefixed(&types.StoreKVPair{ - Key: testKey2, - Value: nil, - StoreKey: testStoreKey2.Name(), - Delete: true, - }) - require.NoError(t, err) - kvPairDelete2Bytes := buf.Bytes() - buf.Reset() - require.Equal(t, expectedOutputKVPairDelete2, kvPairDelete2Bytes) - - unwrappedStore := ms.GetKVStore(testStoreKey3) - require.IsType(t, &iavl.Store{}, unwrappedStore) - - unwrappedStore.Set(testKey2, testValue2) - kvPairSet3Bytes := buf.Bytes() - buf.Reset() - require.Equal(t, []byte{}, kvPairSet3Bytes) - - unwrappedStore.Delete(testKey2) - kvPairDelete3Bytes := buf.Bytes() - buf.Reset() - require.Equal(t, []byte{}, kvPairDelete3Bytes) -} - func TestCacheWraps(t *testing.T) { db := dbm.NewMemDB() multi := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) @@ -932,10 +833,13 @@ func TestStateListeners(t *testing.T) { ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) listener := &MockListener{} - ms.AddListeners(testStoreKey1, []types.WriteListener{listener}) + listeners := map[types.StoreKey][]types.WriteListener{ + testStoreKey1: {listener}, + } require.NoError(t, ms.LoadLatestVersion()) cacheMulti := ms.CacheMultiStore() + cacheMulti = cacheMulti.SetListeners(listeners) store1 := cacheMulti.GetKVStore(testStoreKey1) store1.Set([]byte{1}, []byte{1}) @@ -953,7 +857,7 @@ func TestStateListeners(t *testing.T) { store1.Set([]byte{1}, []byte{1}) require.Empty(t, listener.stateCache) - // writes are not observed when nested cache store commit + // writes are not observed when nested cache store don't enable listeners explicitly. nested.Write() require.Empty(t, listener.stateCache) diff --git a/store/types/store.go b/store/types/store.go index e3c0f0822ad2..7a65a670b520 100644 --- a/store/types/store.go +++ b/store/types/store.go @@ -127,19 +127,18 @@ type MultiStore interface { // implied that the caller should update the context when necessary between // tracing operations. The modified MultiStore is returned. SetTracingContext(TraceContext) MultiStore - - // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey - ListeningEnabled(key StoreKey) bool - - // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey - // It appends the listeners to a current set, if one already exists - AddListeners(key StoreKey, listeners []WriteListener) } // From MultiStore.CacheMultiStore().... type CacheMultiStore interface { MultiStore Write() // Writes operations to underlying KVStore + + // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey + ListeningEnabled(key StoreKey) bool + + // SetListeners reset all the state listeners and re-wire the kv stores. + SetListeners(listeners map[StoreKey][]WriteListener) CacheMultiStore } // CommitMultiStore is an interface for a MultiStore without cache capabilities. diff --git a/types/context.go b/types/context.go index fddee22f950f..e7d300c8428b 100644 --- a/types/context.go +++ b/types/context.go @@ -271,7 +271,14 @@ func (c Context) TransientStore(key storetypes.StoreKey) KVStore { // is called. Note, events are automatically emitted on the parent context's // EventManager when the caller executes the write. func (c Context) CacheContext() (cc Context, writeCache func()) { + return c.CacheContextWithListeners(nil) +} + +func (c Context) CacheContextWithListeners(listeners map[storetypes.StoreKey][]storetypes.WriteListener) (cc Context, writeCache func()) { cms := c.MultiStore().CacheMultiStore() + if len(listeners) > 0 { + cms = cms.SetListeners(listeners) + } cc = c.WithMultiStore(cms).WithEventManager(NewEventManager()) writeCache = func() {