Skip to content

Commit

Permalink
fix: commit info data race (#155)
Browse files Browse the repository at this point in the history
* bugfix: fix race condition related to last commmit info and flush prune heights on every change

* fix log

* flush pruning heights

* WriteSync on flush, fix flush tests

* typo

* fix rebase conflict
  • Loading branch information
p0mvn authored Mar 26, 2022
1 parent 2ed314c commit d53f1dd
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 94 deletions.
30 changes: 22 additions & 8 deletions pruning/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type Manager struct {
logger log.Logger
db dbm.DB
opts *types.PruningOptions
snapshotInterval uint64
pruneHeights []int64
Expand All @@ -24,11 +25,14 @@ type Manager struct {
const (
pruneHeightsKey = "s/pruneheights"
pruneSnapshotHeightsKey = "s/pruneSnheights"

uint64Size = 8
)

func NewManager(logger log.Logger) *Manager {
func NewManager(logger log.Logger, db dbm.DB) *Manager {
return &Manager{
logger: logger,
db: db,
opts: types.NewPruningOptions(types.PruningNothing),
pruneHeights: []int64{},
// These are the heights that are multiples of snapshotInterval and kept for state sync snapshots.
Expand Down Expand Up @@ -120,12 +124,18 @@ func (m *Manager) ShouldPruneAtHeight(height int64) bool {
}

// FlushPruningHeights flushes the pruning heights to the database for crash recovery.
func (m *Manager) FlushPruningHeights(batch dbm.Batch) {
func (m *Manager) FlushPruningHeights() {
if m.opts.GetPruningStrategy() == types.PruningNothing {
return
}
batch := m.db.NewBatch()
defer batch.Close()
m.flushPruningHeights(batch)
m.flushPruningSnapshotHeights(batch)

if err := batch.WriteSync(); err != nil {
panic(fmt.Errorf("error on batch write %w", err))
}
}

// LoadPruningHeights loads the pruning heights from the database as a crash recovery.
Expand Down Expand Up @@ -193,24 +203,28 @@ func (m *Manager) loadPruningSnapshotHeights(db dbm.DB) error {
}

func (m *Manager) flushPruningHeights(batch dbm.Batch) {
bz := make([]byte, 0)
bz := make([]byte, 0, len(m.pruneHeights) * uint64Size)
for _, ph := range m.pruneHeights {
buf := make([]byte, 8)
buf := make([]byte, uint64Size)
binary.BigEndian.PutUint64(buf, uint64(ph))
bz = append(bz, buf...)
}

batch.Set([]byte(pruneHeightsKey), bz)
if err := batch.Set([]byte(pruneHeightsKey), bz); err != nil {
panic(err)
}
}

func (m *Manager) flushPruningSnapshotHeights(batch dbm.Batch) {
m.mx.Lock()
defer m.mx.Unlock()
bz := make([]byte, 0)
bz := make([]byte, 0, m.pruneSnapshotHeights.Len() * uint64Size)
for e := m.pruneSnapshotHeights.Front(); e != nil; e = e.Next() {
buf := make([]byte, 8)
buf := make([]byte, uint64Size)
binary.BigEndian.PutUint64(buf, uint64(e.Value.(int64)))
bz = append(bz, buf...)
}
batch.Set([]byte(pruneSnapshotHeightsKey), bz)
if err := batch.Set([]byte(pruneSnapshotHeightsKey), bz); err != nil {
panic(err)
}
}
41 changes: 21 additions & 20 deletions pruning/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func Test_NewManager(t *testing.T) {
manager := pruning.NewManager(log.NewNopLogger())
manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB())

require.NotNil(t, manager)
require.NotNil(t, manager.GetPruningHeights())
Expand Down Expand Up @@ -75,7 +75,7 @@ func Test_Strategies(t *testing.T) {
},
}

manager := pruning.NewManager(log.NewNopLogger())
manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB())

require.NotNil(t, manager)

Expand Down Expand Up @@ -143,30 +143,34 @@ func Test_Strategies(t *testing.T) {
}

func Test_FlushLoad(t *testing.T) {
manager := pruning.NewManager(log.NewNopLogger())
const (
totalHeights = 1000
snapshotInterval = uint64(10)

pruningKeepRecent = 100
pruningInterval = 15
)

var (
db = db.NewMemDB()
manager = pruning.NewManager(log.NewNopLogger(), db)
curStrategy = types.NewCustomPruningOptions(pruningKeepRecent, pruningInterval)
heightsToPruneMirror = make([]int64, 0)
)
require.NotNil(t, manager)

db := db.NewMemDB()

curStrategy := types.NewCustomPruningOptions(100, 15)

snapshotInterval := uint64(10)

manager.SetSnapshotInterval(snapshotInterval)

manager.SetOptions(curStrategy)
require.Equal(t, curStrategy, manager.GetOptions())

keepRecent := curStrategy.KeepRecent

heightsToPruneMirror := make([]int64, 0)

for curHeight := int64(0); curHeight < 1000; curHeight++ {
handleHeightActual := manager.HandleHeight(curHeight)

curHeightStr := fmt.Sprintf("height: %d", curHeight)

if curHeight > int64(keepRecent) && (snapshotInterval != 0 && (curHeight-int64(keepRecent))%int64(snapshotInterval) != 0 || snapshotInterval == 0) {
expectedHandleHeight := curHeight - int64(keepRecent)
if curHeight > int64(pruningKeepRecent) && (snapshotInterval != 0 && (curHeight-int64(pruningKeepRecent))%int64(snapshotInterval) != 0 || snapshotInterval == 0) {
expectedHandleHeight := curHeight - int64(pruningKeepRecent)
require.Equal(t, expectedHandleHeight, handleHeightActual, curHeightStr)
heightsToPruneMirror = append(heightsToPruneMirror, expectedHandleHeight)
} else {
Expand All @@ -181,10 +185,7 @@ func Test_FlushLoad(t *testing.T) {
// N.B.: There is no reason behind the choice of 3.
if curHeight%3 == 0 {
require.Equal(t, heightsToPruneMirror, manager.GetPruningHeights(), curHeightStr)
batch := db.NewBatch()
manager.FlushPruningHeights(batch)
require.NoError(t, batch.Write())
require.NoError(t, batch.Close())
manager.FlushPruningHeights()

manager.ResetPruningHeights()
require.Equal(t, make([]int64, 0), manager.GetPruningHeights(), curHeightStr)
Expand All @@ -197,7 +198,7 @@ func Test_FlushLoad(t *testing.T) {
}

func Test_WithSnapshot(t *testing.T) {
manager := pruning.NewManager(log.NewNopLogger())
manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB())
require.NotNil(t, manager)

curStrategy := types.NewCustomPruningOptions(10, 10)
Expand Down
124 changes: 63 additions & 61 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"sort"
"strings"
"sync"

"github.com/cosmos/cosmos-sdk/pruning"
pruningTypes "github.com/cosmos/cosmos-sdk/pruning/types"
Expand Down Expand Up @@ -45,13 +46,21 @@ const (
snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)

type storeParams struct {
key types.StoreKey
db dbm.DB
typ types.StoreType
initialVersion uint64
}

// Store is composed of many CommitStores. Name contrasts with
// cacheMultiStore which is used for branching other MultiStores. It implements
// the CommitMultiStore interface.
type Store struct {
db dbm.DB
logger log.Logger
lastCommitInfo *types.CommitInfo
mx *sync.RWMutex // mutex to sync access to lastCommitInfo
pruningManager *pruning.Manager
iavlCacheSize int
storesParams map[types.StoreKey]storeParams
Expand Down Expand Up @@ -86,7 +95,8 @@ func NewStore(db dbm.DB, logger log.Logger) *Store {
stores: make(map[types.StoreKey]types.CommitKVStore),
keysByName: make(map[string]types.StoreKey),
listeners: make(map[types.StoreKey][]types.WriteListener),
pruningManager: pruning.NewManager(logger),
pruningManager: pruning.NewManager(logger, db),
mx: &sync.RWMutex{},
}
}

Expand Down Expand Up @@ -197,7 +207,7 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {
// load old data if we are not version 0
if ver != 0 {
var err error
cInfo, err = getCommitInfo(rs.db, ver)
cInfo, err = rs.getCommitInfoFromDb(ver)
if err != nil {
return err
}
Expand Down Expand Up @@ -378,6 +388,8 @@ func (rs *Store) ListeningEnabled(key types.StoreKey) bool {

// LastCommitID implements Committer/CommitStore.
func (rs *Store) LastCommitID() types.CommitID {
rs.mx.RLock()
defer rs.mx.RUnlock()
if rs.lastCommitInfo == nil {
return types.CommitID{
Version: getLatestVersion(rs.db),
Expand Down Expand Up @@ -405,20 +417,18 @@ func (rs *Store) Commit() types.CommitID {
version = previousHeight + 1
}

rs.lastCommitInfo = rs.commitStores(version, rs.stores)
newCommitInfo := rs.commitStores(version, rs.stores)
rs.updateLatestCommitInfo(newCommitInfo, version)

var pruneErr error
defer func() {
rs.flushMetadata(rs.db, version, rs.lastCommitInfo)
if pruneErr != nil {
panic(pruneErr)
}
}()

pruneErr = rs.handlePruning(version)
err := rs.handlePruning(version)
if err != nil {
panic(err)
}

rs.mx.RLock()
hash, keys := rs.lastCommitInfo.Hash()
rs.logger.Info("calculated commit hash", "height", version, "commit_hash", hash, "keys", keys)
defer rs.mx.RUnlock()
rs.logger.Info("calculated commit hash", "height", version, "commit_hash", fmt.Sprintf("%X", hash), "keys", keys)

return types.CommitID{
Version: version,
Expand Down Expand Up @@ -520,6 +530,8 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
}

func (rs *Store) handlePruning(version int64) error {
defer rs.pruningManager.FlushPruningHeights()

rs.pruningManager.HandleHeight(version - 1) // we should never prune the current version.
if rs.pruningManager.ShouldPruneAtHeight(version) {
rs.logger.Info("prune start", "height", version)
Expand Down Expand Up @@ -603,18 +615,10 @@ func (rs *Store) Query(req abci.RequestQuery) abci.ResponseQuery {
return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "proof is unexpectedly empty; ensure height has not been pruned"))
}

// If the request's height is the latest height we've committed, then utilize
// the store's lastCommitInfo as this commit info may not be flushed to disk.
// Otherwise, we query for the commit info from disk.
var commitInfo *types.CommitInfo

if res.Height == rs.lastCommitInfo.Version {
commitInfo = rs.lastCommitInfo
} else {
commitInfo, err = getCommitInfo(rs.db, res.Height)
if err != nil {
return sdkerrors.QueryResult(err)
}
commitInfo, err := rs.getCommitInfoFromDb(res.Height)
if err != nil {
return sdkerrors.QueryResult(err)
}

// Restore origin path and append proof op.
Expand Down Expand Up @@ -887,7 +891,8 @@ func (rs *Store) Restore(
importer.Close()
}

rs.flushMetadata(rs.db, int64(height), rs.buildCommitInfo(int64(height)))
rs.flushLastCommitInfo(rs.buildCommitInfo(int64(height)))
rs.pruningManager.FlushPruningHeights()
return rs.LoadLatestVersion()
}

Expand Down Expand Up @@ -992,30 +997,46 @@ func (rs *Store) commitStores(version int64, storeMap map[types.StoreKey]types.C
}
}

func (rs *Store) flushMetadata(db dbm.DB, version int64, cInfo *types.CommitInfo) {
rs.logger.Debug("flushing metadata", "height", version)
batch := db.NewBatch()
func (rs *Store) updateLatestCommitInfo(newCommitInfo *types.CommitInfo, version int64) {
rs.mx.Lock()
defer rs.mx.Unlock()
rs.lastCommitInfo = newCommitInfo
rs.flushLastCommitInfo(newCommitInfo)
}

func (rs *Store) flushLastCommitInfo(cInfo *types.CommitInfo) {
batch := rs.db.NewBatch()
defer batch.Close()

flushCommitInfo(batch, version, cInfo)
flushLatestVersion(batch, version)
rs.pruningManager.FlushPruningHeights(batch)
setCommitInfo(batch, cInfo)
setLatestVersion(batch, cInfo.Version)

if err := batch.Write(); err != nil {
if err := batch.WriteSync(); err != nil {
panic(fmt.Errorf("error on batch write %w", err))
}
rs.logger.Debug("flushing metadata finished", "height", version)
}

type storeParams struct {
key types.StoreKey
db dbm.DB
typ types.StoreType
initialVersion uint64
// Gets commitInfo from disk.
func (rs *Store) getCommitInfoFromDb(ver int64) (*types.CommitInfo, error) {
cInfoKey := fmt.Sprintf(commitInfoKeyFmt, ver)

bz, err := rs.db.Get([]byte(cInfoKey))
if err != nil {
return nil, errors.Wrap(err, "failed to get commit info")
} else if bz == nil {
return nil, errors.New("no commit info found")
}

cInfo := &types.CommitInfo{}
if err = cInfo.Unmarshal(bz); err != nil {
return nil, errors.Wrap(err, "failed unmarshal commit info")
}

return cInfo, nil
}

func (rs *Store) doProofsQuery(req abci.RequestQuery) abci.ResponseQuery {
commitInfo, err := getCommitInfo(rs.db, req.Height)
commitInfo, err := rs.getCommitInfoFromDb(req.Height)
if err != nil {
return sdkerrors.QueryResult(err)
}
Expand Down Expand Up @@ -1049,36 +1070,17 @@ func getLatestVersion(db dbm.DB) int64 {
return latestVersion
}

// Gets commitInfo from disk.
func getCommitInfo(db dbm.DB, ver int64) (*types.CommitInfo, error) {
cInfoKey := fmt.Sprintf(commitInfoKeyFmt, ver)

bz, err := db.Get([]byte(cInfoKey))
if err != nil {
return nil, errors.Wrap(err, "failed to get commit info")
} else if bz == nil {
return nil, errors.New("no commit info found")
}

cInfo := &types.CommitInfo{}
if err = cInfo.Unmarshal(bz); err != nil {
return nil, errors.Wrap(err, "failed unmarshal commit info")
}

return cInfo, nil
}

func flushCommitInfo(batch dbm.Batch, version int64, cInfo *types.CommitInfo) {
func setCommitInfo(batch dbm.Batch, cInfo *types.CommitInfo) {
bz, err := cInfo.Marshal()
if err != nil {
panic(err)
}

cInfoKey := fmt.Sprintf(commitInfoKeyFmt, version)
cInfoKey := fmt.Sprintf(commitInfoKeyFmt, cInfo.Version)
batch.Set([]byte(cInfoKey), bz)
}

func flushLatestVersion(batch dbm.Batch, version int64) {
func setLatestVersion(batch dbm.Batch, version int64) {
bz, err := gogotypes.StdInt64Marshal(version)
if err != nil {
panic(err)
Expand Down
Loading

0 comments on commit d53f1dd

Please sign in to comment.