diff --git a/pruning/manager.go b/pruning/manager.go index a1af49b6b772..b527ac3e3623 100644 --- a/pruning/manager.go +++ b/pruning/manager.go @@ -14,6 +14,7 @@ import ( type Manager struct { logger log.Logger + db dbm.DB opts *types.PruningOptions snapshotInterval uint64 pruneHeights []int64 @@ -24,11 +25,14 @@ type Manager struct { const ( pruneHeightsKey = "s/pruneheights" pruneSnapshotHeightsKey = "s/pruneSnheights" + + uint64Size = 8 ) -func NewManager(logger log.Logger) *Manager { +func NewManager(logger log.Logger, db dbm.DB) *Manager { return &Manager{ logger: logger, + db: db, opts: types.NewPruningOptions(types.PruningNothing), pruneHeights: []int64{}, // These are the heights that are multiples of snapshotInterval and kept for state sync snapshots. @@ -120,12 +124,18 @@ func (m *Manager) ShouldPruneAtHeight(height int64) bool { } // FlushPruningHeights flushes the pruning heights to the database for crash recovery. -func (m *Manager) FlushPruningHeights(batch dbm.Batch) { +func (m *Manager) FlushPruningHeights() { if m.opts.GetPruningStrategy() == types.PruningNothing { return } + batch := m.db.NewBatch() + defer batch.Close() m.flushPruningHeights(batch) m.flushPruningSnapshotHeights(batch) + + if err := batch.WriteSync(); err != nil { + panic(fmt.Errorf("error on batch write %w", err)) + } } // LoadPruningHeights loads the pruning heights from the database as a crash recovery. @@ -193,24 +203,28 @@ func (m *Manager) loadPruningSnapshotHeights(db dbm.DB) error { } func (m *Manager) flushPruningHeights(batch dbm.Batch) { - bz := make([]byte, 0) + bz := make([]byte, 0, len(m.pruneHeights) * uint64Size) for _, ph := range m.pruneHeights { - buf := make([]byte, 8) + buf := make([]byte, uint64Size) binary.BigEndian.PutUint64(buf, uint64(ph)) bz = append(bz, buf...) } - batch.Set([]byte(pruneHeightsKey), bz) + if err := batch.Set([]byte(pruneHeightsKey), bz); err != nil { + panic(err) + } } func (m *Manager) flushPruningSnapshotHeights(batch dbm.Batch) { m.mx.Lock() defer m.mx.Unlock() - bz := make([]byte, 0) + bz := make([]byte, 0, m.pruneSnapshotHeights.Len() * uint64Size) for e := m.pruneSnapshotHeights.Front(); e != nil; e = e.Next() { - buf := make([]byte, 8) + buf := make([]byte, uint64Size) binary.BigEndian.PutUint64(buf, uint64(e.Value.(int64))) bz = append(bz, buf...) } - batch.Set([]byte(pruneSnapshotHeightsKey), bz) + if err := batch.Set([]byte(pruneSnapshotHeightsKey), bz); err != nil { + panic(err) + } } diff --git a/pruning/manager_test.go b/pruning/manager_test.go index ddb96ddfe81d..edb9624576cb 100644 --- a/pruning/manager_test.go +++ b/pruning/manager_test.go @@ -17,7 +17,7 @@ import ( ) func Test_NewManager(t *testing.T) { - manager := pruning.NewManager(log.NewNopLogger()) + manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB()) require.NotNil(t, manager) require.NotNil(t, manager.GetPruningHeights()) @@ -75,7 +75,7 @@ func Test_Strategies(t *testing.T) { }, } - manager := pruning.NewManager(log.NewNopLogger()) + manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB()) require.NotNil(t, manager) @@ -143,30 +143,34 @@ func Test_Strategies(t *testing.T) { } func Test_FlushLoad(t *testing.T) { - manager := pruning.NewManager(log.NewNopLogger()) + const ( + totalHeights = 1000 + snapshotInterval = uint64(10) + + pruningKeepRecent = 100 + pruningInterval = 15 + ) + + var ( + db = db.NewMemDB() + manager = pruning.NewManager(log.NewNopLogger(), db) + curStrategy = types.NewCustomPruningOptions(pruningKeepRecent, pruningInterval) + heightsToPruneMirror = make([]int64, 0) + ) require.NotNil(t, manager) - - db := db.NewMemDB() - - curStrategy := types.NewCustomPruningOptions(100, 15) - - snapshotInterval := uint64(10) + manager.SetSnapshotInterval(snapshotInterval) manager.SetOptions(curStrategy) require.Equal(t, curStrategy, manager.GetOptions()) - keepRecent := curStrategy.KeepRecent - - heightsToPruneMirror := make([]int64, 0) - for curHeight := int64(0); curHeight < 1000; curHeight++ { handleHeightActual := manager.HandleHeight(curHeight) curHeightStr := fmt.Sprintf("height: %d", curHeight) - if curHeight > int64(keepRecent) && (snapshotInterval != 0 && (curHeight-int64(keepRecent))%int64(snapshotInterval) != 0 || snapshotInterval == 0) { - expectedHandleHeight := curHeight - int64(keepRecent) + if curHeight > int64(pruningKeepRecent) && (snapshotInterval != 0 && (curHeight-int64(pruningKeepRecent))%int64(snapshotInterval) != 0 || snapshotInterval == 0) { + expectedHandleHeight := curHeight - int64(pruningKeepRecent) require.Equal(t, expectedHandleHeight, handleHeightActual, curHeightStr) heightsToPruneMirror = append(heightsToPruneMirror, expectedHandleHeight) } else { @@ -181,10 +185,7 @@ func Test_FlushLoad(t *testing.T) { // N.B.: There is no reason behind the choice of 3. if curHeight%3 == 0 { require.Equal(t, heightsToPruneMirror, manager.GetPruningHeights(), curHeightStr) - batch := db.NewBatch() - manager.FlushPruningHeights(batch) - require.NoError(t, batch.Write()) - require.NoError(t, batch.Close()) + manager.FlushPruningHeights() manager.ResetPruningHeights() require.Equal(t, make([]int64, 0), manager.GetPruningHeights(), curHeightStr) @@ -197,7 +198,7 @@ func Test_FlushLoad(t *testing.T) { } func Test_WithSnapshot(t *testing.T) { - manager := pruning.NewManager(log.NewNopLogger()) + manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB()) require.NotNil(t, manager) curStrategy := types.NewCustomPruningOptions(10, 10) diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index dd22e7432516..3ded8ada42a3 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -8,6 +8,7 @@ import ( "math" "sort" "strings" + "sync" "github.com/cosmos/cosmos-sdk/pruning" pruningTypes "github.com/cosmos/cosmos-sdk/pruning/types" @@ -45,6 +46,13 @@ const ( snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit ) +type storeParams struct { + key types.StoreKey + db dbm.DB + typ types.StoreType + initialVersion uint64 +} + // Store is composed of many CommitStores. Name contrasts with // cacheMultiStore which is used for branching other MultiStores. It implements // the CommitMultiStore interface. @@ -52,6 +60,7 @@ type Store struct { db dbm.DB logger log.Logger lastCommitInfo *types.CommitInfo + mx *sync.RWMutex // mutex to sync access to lastCommitInfo pruningManager *pruning.Manager iavlCacheSize int storesParams map[types.StoreKey]storeParams @@ -86,7 +95,8 @@ func NewStore(db dbm.DB, logger log.Logger) *Store { stores: make(map[types.StoreKey]types.CommitKVStore), keysByName: make(map[string]types.StoreKey), listeners: make(map[types.StoreKey][]types.WriteListener), - pruningManager: pruning.NewManager(logger), + pruningManager: pruning.NewManager(logger, db), + mx: &sync.RWMutex{}, } } @@ -197,7 +207,7 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { // load old data if we are not version 0 if ver != 0 { var err error - cInfo, err = getCommitInfo(rs.db, ver) + cInfo, err = rs.getCommitInfoFromDb(ver) if err != nil { return err } @@ -378,6 +388,8 @@ func (rs *Store) ListeningEnabled(key types.StoreKey) bool { // LastCommitID implements Committer/CommitStore. func (rs *Store) LastCommitID() types.CommitID { + rs.mx.RLock() + defer rs.mx.RUnlock() if rs.lastCommitInfo == nil { return types.CommitID{ Version: getLatestVersion(rs.db), @@ -405,20 +417,18 @@ func (rs *Store) Commit() types.CommitID { version = previousHeight + 1 } - rs.lastCommitInfo = rs.commitStores(version, rs.stores) + newCommitInfo := rs.commitStores(version, rs.stores) + rs.updateLatestCommitInfo(newCommitInfo, version) - var pruneErr error - defer func() { - rs.flushMetadata(rs.db, version, rs.lastCommitInfo) - if pruneErr != nil { - panic(pruneErr) - } - }() - - pruneErr = rs.handlePruning(version) + err := rs.handlePruning(version) + if err != nil { + panic(err) + } + rs.mx.RLock() hash, keys := rs.lastCommitInfo.Hash() - rs.logger.Info("calculated commit hash", "height", version, "commit_hash", hash, "keys", keys) + defer rs.mx.RUnlock() + rs.logger.Info("calculated commit hash", "height", version, "commit_hash", fmt.Sprintf("%X", hash), "keys", keys) return types.CommitID{ Version: version, @@ -520,6 +530,8 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { } func (rs *Store) handlePruning(version int64) error { + defer rs.pruningManager.FlushPruningHeights() + rs.pruningManager.HandleHeight(version - 1) // we should never prune the current version. if rs.pruningManager.ShouldPruneAtHeight(version) { rs.logger.Info("prune start", "height", version) @@ -603,18 +615,10 @@ func (rs *Store) Query(req abci.RequestQuery) abci.ResponseQuery { return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "proof is unexpectedly empty; ensure height has not been pruned")) } - // If the request's height is the latest height we've committed, then utilize - // the store's lastCommitInfo as this commit info may not be flushed to disk. - // Otherwise, we query for the commit info from disk. - var commitInfo *types.CommitInfo - if res.Height == rs.lastCommitInfo.Version { - commitInfo = rs.lastCommitInfo - } else { - commitInfo, err = getCommitInfo(rs.db, res.Height) - if err != nil { - return sdkerrors.QueryResult(err) - } + commitInfo, err := rs.getCommitInfoFromDb(res.Height) + if err != nil { + return sdkerrors.QueryResult(err) } // Restore origin path and append proof op. @@ -887,7 +891,8 @@ func (rs *Store) Restore( importer.Close() } - rs.flushMetadata(rs.db, int64(height), rs.buildCommitInfo(int64(height))) + rs.flushLastCommitInfo(rs.buildCommitInfo(int64(height))) + rs.pruningManager.FlushPruningHeights() return rs.LoadLatestVersion() } @@ -992,30 +997,46 @@ func (rs *Store) commitStores(version int64, storeMap map[types.StoreKey]types.C } } -func (rs *Store) flushMetadata(db dbm.DB, version int64, cInfo *types.CommitInfo) { - rs.logger.Debug("flushing metadata", "height", version) - batch := db.NewBatch() +func (rs *Store) updateLatestCommitInfo(newCommitInfo *types.CommitInfo, version int64) { + rs.mx.Lock() + defer rs.mx.Unlock() + rs.lastCommitInfo = newCommitInfo + rs.flushLastCommitInfo(newCommitInfo) +} + +func (rs *Store) flushLastCommitInfo(cInfo *types.CommitInfo) { + batch := rs.db.NewBatch() defer batch.Close() - flushCommitInfo(batch, version, cInfo) - flushLatestVersion(batch, version) - rs.pruningManager.FlushPruningHeights(batch) + setCommitInfo(batch, cInfo) + setLatestVersion(batch, cInfo.Version) - if err := batch.Write(); err != nil { + if err := batch.WriteSync(); err != nil { panic(fmt.Errorf("error on batch write %w", err)) } - rs.logger.Debug("flushing metadata finished", "height", version) } -type storeParams struct { - key types.StoreKey - db dbm.DB - typ types.StoreType - initialVersion uint64 +// Gets commitInfo from disk. +func (rs *Store) getCommitInfoFromDb(ver int64) (*types.CommitInfo, error) { + cInfoKey := fmt.Sprintf(commitInfoKeyFmt, ver) + + bz, err := rs.db.Get([]byte(cInfoKey)) + if err != nil { + return nil, errors.Wrap(err, "failed to get commit info") + } else if bz == nil { + return nil, errors.New("no commit info found") + } + + cInfo := &types.CommitInfo{} + if err = cInfo.Unmarshal(bz); err != nil { + return nil, errors.Wrap(err, "failed unmarshal commit info") + } + + return cInfo, nil } func (rs *Store) doProofsQuery(req abci.RequestQuery) abci.ResponseQuery { - commitInfo, err := getCommitInfo(rs.db, req.Height) + commitInfo, err := rs.getCommitInfoFromDb(req.Height) if err != nil { return sdkerrors.QueryResult(err) } @@ -1049,36 +1070,17 @@ func getLatestVersion(db dbm.DB) int64 { return latestVersion } -// Gets commitInfo from disk. -func getCommitInfo(db dbm.DB, ver int64) (*types.CommitInfo, error) { - cInfoKey := fmt.Sprintf(commitInfoKeyFmt, ver) - - bz, err := db.Get([]byte(cInfoKey)) - if err != nil { - return nil, errors.Wrap(err, "failed to get commit info") - } else if bz == nil { - return nil, errors.New("no commit info found") - } - - cInfo := &types.CommitInfo{} - if err = cInfo.Unmarshal(bz); err != nil { - return nil, errors.Wrap(err, "failed unmarshal commit info") - } - - return cInfo, nil -} - -func flushCommitInfo(batch dbm.Batch, version int64, cInfo *types.CommitInfo) { +func setCommitInfo(batch dbm.Batch, cInfo *types.CommitInfo) { bz, err := cInfo.Marshal() if err != nil { panic(err) } - cInfoKey := fmt.Sprintf(commitInfoKeyFmt, version) + cInfoKey := fmt.Sprintf(commitInfoKeyFmt, cInfo.Version) batch.Set([]byte(cInfoKey), bz) } -func flushLatestVersion(batch dbm.Batch, version int64) { +func setLatestVersion(batch dbm.Batch, version int64) { bz, err := gogotypes.StdInt64Marshal(version) if err != nil { panic(err) diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index adac8310d9e9..5ad5886b0e72 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -215,7 +215,7 @@ func TestMultistoreLoadWithUpgrade(t *testing.T) { expectedCommitID := getExpectedCommitID(store, 1) checkStore(t, store, expectedCommitID, commitID) - ci, err := getCommitInfo(db, 1) + ci, err := store.getCommitInfoFromDb(1) require.NoError(t, err) require.Equal(t, int64(1), ci.Version) require.Equal(t, 3, len(ci.StoreInfos)) @@ -299,7 +299,7 @@ func TestMultistoreLoadWithUpgrade(t *testing.T) { require.Equal(t, v4, rl4.Get(k4)) // check commitInfo in storage - ci, err = getCommitInfo(db, 2) + ci, err = store.getCommitInfoFromDb(2) require.NoError(t, err) require.Equal(t, int64(2), ci.Version) require.Equal(t, 4, len(ci.StoreInfos), ci.StoreInfos) @@ -355,7 +355,7 @@ func TestMultiStoreRestart(t *testing.T) { multi.Commit() - cinfo, err := getCommitInfo(multi.db, int64(i)) + cinfo, err := multi.getCommitInfoFromDb(int64(i)) require.NoError(t, err) require.Equal(t, int64(i), cinfo.Version) } @@ -370,7 +370,7 @@ func TestMultiStoreRestart(t *testing.T) { multi.Commit() - flushedCinfo, err := getCommitInfo(multi.db, 3) + flushedCinfo, err := multi.getCommitInfoFromDb(3) require.Nil(t, err) require.NotEqual(t, initCid, flushedCinfo, "CID is different after flush to disk") @@ -380,7 +380,7 @@ func TestMultiStoreRestart(t *testing.T) { multi.Commit() - postFlushCinfo, err := getCommitInfo(multi.db, 4) + postFlushCinfo, err := multi.getCommitInfoFromDb(4) require.NoError(t, err) require.Equal(t, int64(4), postFlushCinfo.Version, "Commit changed after in-memory commit")