Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: memiavl don't support rollback #1091

Merged
merged 8 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

- [#1042](https://github.com/crypto-org-chain/cronos/pull/1042) call Close method on app to cleanup resource on graceful shutdown ([ethermint commit](https://github.com/crypto-org-chain/ethermint/commit/0ea7b86532a1144f229961f94b4524d5889e874d)).
- [#1083](https://github.com/crypto-org-chain/cronos/pull/1083) memiavl support both sdk 46 and 47 root hash rules.
- [#1091](https://github.com/crypto-org-chain/cronos/pull/1091) memiavl support rollback.

### Improvements

Expand Down
8 changes: 5 additions & 3 deletions integration_tests/configs/rollback.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ local config = import 'default.jsonnet';

config {
'cronos_777-1'+: {
'app-config'+: {
'iavl-disable-fastnode': true,
},
validators: super.validators + [{
name: 'fullnode',
'app-config': {
memiavl: {
enable: true,
},
},
}],
},
}
220 changes: 143 additions & 77 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type Options struct {
ZeroCopy bool
// CacheSize defines the cache's max entry size for each memiavl store.
CacheSize int
// LoadForOverwriting if true rollbacks the state, specifically the Load method will
// truncate the versions after the `TargetVersion`, the `TargetVersion` becomes the latest version.
// it do nothing if the target version is `0`.
LoadForOverwriting bool
}

const (
Expand All @@ -96,27 +100,17 @@ const (
)

func Load(dir string, opts Options) (*DB, error) {
snapshotName := "current"
snapshot := "current"
if opts.TargetVersion > 0 {
version, err := currentVersion(dir)
// find the biggest snapshot version that's less than or equal to the target version
snapshotVersion, err := seekSnapshot(dir, opts.TargetVersion)
if err != nil {
return nil, fmt.Errorf("failed to load current version: %w", err)
}

if int64(opts.TargetVersion) < version {
// try to load historical snapshots
snapshotName, err = seekSnapshot(dir, opts.TargetVersion)
if err != nil {
return nil, fmt.Errorf("failed to find snapshot: %w", err)
}

if snapshotName == "" {
return nil, fmt.Errorf("target version is pruned: %d", opts.TargetVersion)
}
return nil, fmt.Errorf("fail to seek snapshot: %w", err)
}
snapshot = snapshotName(snapshotVersion)
}

path := filepath.Join(dir, snapshotName)
path := filepath.Join(dir, snapshot)
mtree, err := LoadMultiTree(path, opts.ZeroCopy, opts.CacheSize)
if err != nil {
if opts.CreateIfMissing && os.IsNotExist(err) {
Expand All @@ -141,6 +135,43 @@ func Load(dir string, opts Options) (*DB, error) {
}
}

if opts.LoadForOverwriting && opts.TargetVersion > 0 {
currentSnapshot, err := os.Readlink(currentPath(dir))
if err != nil {
return nil, fmt.Errorf("fail to read current version: %w", err)
}

if snapshot != currentSnapshot {
// downgrade `"current"` link first
opts.Logger.Info("downgrade current link to %s", snapshot)
if err := updateCurrentSymlink(dir, snapshot); err != nil {
return nil, fmt.Errorf("fail to update current snapshot link: %w", err)
}
}

// truncate the WAL
opts.Logger.Info("truncate WAL from back, version: %d", opts.TargetVersion)
if err := wal.TruncateBack(walIndex(int64(opts.TargetVersion), mtree.initialVersion)); err != nil {

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
return nil, fmt.Errorf("fail to truncate wal logs: %w", err)
}

// prune snapshots that's larger than the target version
if err := traverseSnapshots(dir, false, func(version int64) (bool, error) {
if version <= int64(opts.TargetVersion) {

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
return true, nil
}

if err := atomicRemoveDir(filepath.Join(dir, snapshotName(version))); err != nil {
opts.Logger.Error("fail to prune snapshot, version: %d", version)
} else {
opts.Logger.Info("prune snapshot, version: %d", version)
}
return false, nil
}); err != nil {
return nil, fmt.Errorf("fail to prune snapshots: %w", err)
}
}

db := &DB{
MultiTree: *mtree,
logger: opts.Logger,
Expand Down Expand Up @@ -270,45 +301,35 @@ func (db *DB) pruneSnapshots() {
go func() {
defer db.pruneSnapshotLock.Unlock()

currentName, err := os.Readlink(currentPath(db.dir))
if err != nil {
db.logger.Error("failed to read current snapshot name", "err", err)
return
}

entries, err := os.ReadDir(db.dir)
currentVersion, err := currentVersion(db.dir)
if err != nil {
db.logger.Error("failed to read db dir", "err", err)
db.logger.Error("failed to read current snapshot version", "err", err)
return
}

counter := db.snapshotKeepRecent
for i := len(entries) - 1; i >= 0; i-- {
name := entries[i].Name()
if !entries[i].IsDir() || !isSnapshotName(name) {
continue
}

if name >= currentName {
if err := traverseSnapshots(db.dir, false, func(version int64) (bool, error) {
if version >= currentVersion {
// ignore any newer snapshot directories, there could be ongoning snapshot rewrite.
continue
return false, nil
}

if counter > 0 {
counter--
continue
return false, nil
}

name := snapshotName(version)
db.logger.Info("prune snapshot", "name", name)
tmpPath := filepath.Join(db.dir, name+"-tmp")
if err := os.Rename(filepath.Join(db.dir, name), tmpPath); err != nil {
db.logger.Error("failed to move the snapshot to tmp file", "err", err)
continue
}

if err := os.RemoveAll(tmpPath); err != nil {
if err := atomicRemoveDir(filepath.Join(db.dir, name)); err != nil {
db.logger.Error("failed to prune snapshot", "err", err)
}

return false, nil
}); err != nil {
db.logger.Error("fail to prune snapshots", "err", err)
return
}

// truncate WAL until the earliest remaining snapshot
Expand Down Expand Up @@ -436,8 +457,7 @@ func (db *DB) RewriteSnapshot() error {
db.mtx.Lock()
defer db.mtx.Unlock()

version := uint32(db.lastCommitInfo.Version)
snapshotDir := snapshotName(version)
snapshotDir := snapshotName(db.lastCommitInfo.Version)
tmpDir := snapshotDir + "-tmp"
path := filepath.Join(db.dir, tmpDir)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
Expand Down Expand Up @@ -610,18 +630,14 @@ func (db *DB) WriteSnapshot(dir string) error {
return db.MultiTree.WriteSnapshot(dir)
}

func snapshotName(version uint32) string {
func snapshotName(version int64) string {
return fmt.Sprintf("%s%020d", SnapshotPrefix, version)
}

func currentPath(root string) string {
return filepath.Join(root, "current")
}

func snapshotPath(root string, version uint32) string {
return filepath.Join(root, snapshotName(version))
}

func currentTmpPath(root string) string {
return filepath.Join(root, "current-tmp")
}
Expand All @@ -645,53 +661,54 @@ func parseVersion(name string) (int64, error) {
return 0, fmt.Errorf("invalid snapshot name %s", name)
}

return strconv.ParseInt(name[len(SnapshotPrefix):], 10, 64)
}

// seekSnapshot find the biggest snapshot that's smaller than or equal to target version,
// returns the directory name, if not found, returns empty string.
func seekSnapshot(root string, version uint32) (string, error) {
entries, err := os.ReadDir(root)
v, err := strconv.ParseInt(name[len(SnapshotPrefix):], 10, 32)
if err != nil {
return "", err
return 0, fmt.Errorf("snapshot version overflows: %d", err)
}

targetName := snapshotName(version)
for i := len(entries) - 1; i >= 0; i-- {
name := entries[i].Name()
if !entries[i].IsDir() || !isSnapshotName(name) {
continue
}
return v, nil
}

if name <= targetName {
return name, nil
// seekSnapshot find the biggest snapshot version that's smaller than or equal to the target version,
// returns 0 if not found.
func seekSnapshot(root string, targetVersion uint32) (int64, error) {
var (
snapshotVersion int64
found bool
)
if err := traverseSnapshots(root, false, func(version int64) (bool, error) {
if version <= int64(targetVersion) {

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
found = true
snapshotVersion = version
return true, nil
}
return false, nil
}); err != nil {
return 0, err
}

return "", nil
if !found {
return 0, fmt.Errorf("target version is pruned: %d", targetVersion)
}

return snapshotVersion, nil
}

// firstSnapshotVersion returns the earliest snapshot name in the db
func firstSnapshotVersion(root string) (int64, error) {
entries, err := os.ReadDir(root)
if err != nil {
var found int64
if err := traverseSnapshots(root, true, func(version int64) (bool, error) {
found = version
return true, nil
}); err != nil {
return 0, err
}

for _, entry := range entries {
if !entry.IsDir() || !isSnapshotName(entry.Name()) {
continue
}

version, err := parseVersion(entry.Name())
if err != nil {
return 0, err
}

return version, nil
if found == 0 {
return 0, errors.New("empty memiavl db")
}

return 0, errors.New("empty memiavl db")
return found, nil
}

func walPath(root string) string {
Expand Down Expand Up @@ -727,6 +744,55 @@ func updateCurrentSymlink(dir, snapshot string) error {
return os.Rename(tmpPath, currentPath(dir))
}

// traverseSnapshots traverse the snapshot list in specified order.
func traverseSnapshots(dir string, ascending bool, callback func(int64) (bool, error)) error {
entries, err := os.ReadDir(dir)
if err != nil {
return err
}

process := func(entry os.DirEntry) (bool, error) {
if !entry.IsDir() || !isSnapshotName(entry.Name()) {
return false, nil
}

version, err := parseVersion(entry.Name())
if err != nil {
return true, fmt.Errorf("invalid snapshot name: %w", err)
}

return callback(version)
}

if ascending {
for i := 0; i < len(entries); i++ {
stop, err := process(entries[i])
if stop || err != nil {
return err
}
}
} else {
for i := len(entries) - 1; i >= 0; i-- {
stop, err := process(entries[i])
if stop || err != nil {
return err
}
}
}

return nil
}

// atomicRemoveDir is equavalent to `mv snapshot snapshot-tmp && rm -r snapshot-tmp`
func atomicRemoveDir(path string) error {
tmpPath := path + "-tmp"
if err := os.Rename(path, tmpPath); err != nil {
return err
}

return os.RemoveAll(tmpPath)
}

type walEntry struct {
index uint64
data *WALEntry
Expand Down
3 changes: 2 additions & 1 deletion memiavl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"path/filepath"

"cosmossdk.io/errors"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
Expand Down Expand Up @@ -46,7 +47,7 @@ func (db *DB) Snapshot(height uint64, protoWriter protoio.Writer) (returnErr err
if int64(version) > curVersion {
return fmt.Errorf("snapshot is not created yet: height: %d", version)
}
mtree, err = LoadMultiTree(snapshotPath(db.dir, version), true, 0)
mtree, err = LoadMultiTree(filepath.Join(db.dir, snapshotName(int64(version))), true, 0)

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
if err != nil {
return errors.Wrapf(err, "snapshot don't exists: height: %d", version)
}
Expand Down
2 changes: 1 addition & 1 deletion memiavl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Import(
if height > math.MaxUint32 {
return snapshottypes.SnapshotItem{}, fmt.Errorf("version overflows uint32: %d", height)
}
snapshotDir := snapshotName(uint32(height))
snapshotDir := snapshotName(int64(height))

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
tmpDir := snapshotDir + "-tmp"

// Import nodes into stores. The first item is expected to be a SnapshotItem containing
Expand Down
Loading