Skip to content

Commit

Permalink
fix LoadVersionForOverwriting data corruption (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker authored Jun 24, 2020
1 parent bb7cd9c commit 5ef0d85
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 116 deletions.
88 changes: 15 additions & 73 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,30 @@ func (tree *MutableTree) LoadVersion(targetVersion int64) (int64, error) {
}

// LoadVersionForOverwriting attempts to load a tree at a previously committed
// version. Any versions greater than targetVersion will be deleted.
// version, or the latest version below it. Any versions greater than targetVersion will be deleted.
func (tree *MutableTree) LoadVersionForOverwriting(targetVersion int64) (int64, error) {
latestVersion, err := tree.LoadVersion(targetVersion)
if err != nil {
return latestVersion, err
}

if err := tree.deleteVersionsFrom(targetVersion + 1); err != nil {
if err = tree.ndb.DeleteVersionsFrom(targetVersion + 1); err != nil {
return latestVersion, err
}

return targetVersion, nil
if err = tree.ndb.Commit(); err != nil {
return latestVersion, err
}

tree.ndb.resetLatestVersion(latestVersion)

for v := range tree.versions {
if v > targetVersion {
delete(tree.versions, v)
}
}

return latestVersion, nil
}

// GetImmutable loads an ImmutableTree at a given version for querying. The returned tree is
Expand Down Expand Up @@ -539,76 +551,6 @@ func (tree *MutableTree) DeleteVersion(version int64) error {
return nil
}

// deleteVersionsFrom deletes tree version from disk specified version to latest
// version along with each version's metadata. The version can then no longer be
// accessed.
func (tree *MutableTree) deleteVersionsFrom(version int64) error {
if version <= 0 {
return errors.New("version must be greater than 0")
}

newLatestVersion := version - 1
lastestVersion := tree.ndb.getLatestVersion()

for ; version <= lastestVersion; version++ {
if version == tree.version {
return errors.Errorf("cannot delete latest saved version (%d)", version)
}

if err := tree.ndb.DeleteVersion(version, false); err != nil {
return err
}

if version == lastestVersion {
root, err := tree.ndb.getRoot(version)
if err != nil {
return err
}

if err := tree.deleteNodes(newLatestVersion, root); err != nil {
return err
}
}

delete(tree.versions, version)
}

tree.ndb.restoreNodes(newLatestVersion)

if err := tree.ndb.Commit(); err != nil {
return err
}

tree.ndb.resetLatestVersion(newLatestVersion)
return nil
}

// deleteNodes deletes all nodes which have greater version than current, because they are not useful anymore
// FIXME This should probably happen in NodeDB.
func (tree *MutableTree) deleteNodes(version int64, hash []byte) error {
if len(hash) == 0 {
return nil
}

node := tree.ndb.GetNode(hash)
if node.leftHash != nil {
if err := tree.deleteNodes(version, node.leftHash); err != nil {
return err
}
}
if node.rightHash != nil {
if err := tree.deleteNodes(version, node.rightHash); err != nil {
return err
}
}

if node.version > version {
tree.ndb.batch.Delete(tree.ndb.nodeKey(hash))
}

return nil
}

// Rotate right and return the new node and orphan.
func (tree *MutableTree) rotateRight(node *Node) (*Node, *Node) {
version := tree.version + 1
Expand Down
95 changes: 85 additions & 10 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"container/list"
"crypto/sha256"
"fmt"
"math"
"sort"
"sync"

Expand Down Expand Up @@ -184,6 +185,84 @@ func (ndb *nodeDB) DeleteVersion(version int64, checkLatestVersion bool) error {
return nil
}

// DeleteVersionsFrom permanently deletes all tree versions from the given version upwards.
func (ndb *nodeDB) DeleteVersionsFrom(version int64) error {
latest := ndb.getLatestVersion()
if latest < version {
return nil
}
root, err := ndb.getRoot(latest)
if err != nil {
return err
}
if root == nil {
return errors.Errorf("root for version %v not found", latest)
}

for v, r := range ndb.versionReaders {
if v >= version && r != 0 {
return errors.Errorf("unable to delete version %v with %v active readers", v, r)
}
}

// First, delete all active nodes in the current (latest) version whose node version is after
// the given version.
err = ndb.deleteNodesFrom(version, root)
if err != nil {
return err
}

// Next, delete orphans:
// - Delete orphan entries *and referred nodes* with fromVersion >= version
// - Delete orphan entries with toVersion >= version-1 (since orphans at latest are not orphans)
ndb.traverseOrphans(func(key, hash []byte) {
var fromVersion, toVersion int64
orphanKeyFormat.Scan(key, &toVersion, &fromVersion)

if fromVersion >= version {
ndb.batch.Delete(key)
ndb.batch.Delete(ndb.nodeKey(hash))
ndb.uncacheNode(hash)
} else if toVersion >= version-1 {
ndb.batch.Delete(key)
}
})

// Finally, delete the version root entries
ndb.traverseRange(rootKeyFormat.Key(version), rootKeyFormat.Key(math.MaxInt64), func(k, v []byte) {
ndb.batch.Delete(k)
})

return nil
}

// deleteNodesFrom deletes the given node and any descendants that have versions after the given
// (inclusive). It is mainly used via LoadVersionForOverwriting, to delete the current version.
func (ndb *nodeDB) deleteNodesFrom(version int64, hash []byte) error {
if len(hash) == 0 {
return nil
}

node := ndb.GetNode(hash)
if node.leftHash != nil {
if err := ndb.deleteNodesFrom(version, node.leftHash); err != nil {
return err
}
}
if node.rightHash != nil {
if err := ndb.deleteNodesFrom(version, node.rightHash); err != nil {
return err
}
}

if node.version >= version {
ndb.batch.Delete(ndb.nodeKey(hash))
ndb.uncacheNode(hash)
}

return nil
}

// Saves orphaned nodes to disk under a special prefix.
// version: the new version being saved.
// orphans: the orphan nodes created since version-1
Expand Down Expand Up @@ -309,7 +388,12 @@ func (ndb *nodeDB) traverseOrphansVersion(version int64, fn func(k, v []byte)) {

// Traverse all keys.
func (ndb *nodeDB) traverse(fn func(key, value []byte)) {
itr, err := ndb.db.Iterator(nil, nil)
ndb.traverseRange(nil, nil, fn)
}

// Traverse all keys between a given range (excluding end).
func (ndb *nodeDB) traverseRange(start []byte, end []byte, fn func(k, v []byte)) {
itr, err := ndb.db.Iterator(start, end)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -498,15 +582,6 @@ func (ndb *nodeDB) traverseNodes(fn func(hash []byte, node *Node)) {
}
}

// restoreNodes restores nodes, which was orphaned, but after overwriting should not be orphans anymore
func (ndb *nodeDB) restoreNodes(version int64) {
// FIXME This fails to take into account future orphans, see:
// https://github.com/cosmos/iavl/issues/273
ndb.traverseOrphansVersion(version, func(key, hash []byte) {
ndb.batch.Delete(key)
})
}

func (ndb *nodeDB) String() string {
var str string
index := 0
Expand Down
56 changes: 29 additions & 27 deletions tree_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func TestRandomOperations(t *testing.T) {
seeds := []int64{
49872768940,
49872768941,
756509998,
480459882,
32473644,
Expand All @@ -41,13 +41,13 @@ func testRandomOperations(t *testing.T, randSeed int64) {
keySize = 16 // before base64-encoding
valueSize = 16 // before base64-encoding

versions = 32 // number of final versions to generate
reloadChance = 0.2 // chance of tree reload after save
deleteChance = 0.2 // chance of random version deletion after save
revertChance = 0.0 // chance to revert tree to random version with LoadVersionForOverwriting
syncChance = 0.3 // chance of enabling sync writes on tree load
cacheChance = 0.4 // chance of enabling caching
cacheSizeMax = 256 // maximum size of cache (will be random from 1)
versions = 32 // number of final versions to generate
reloadChance = 0.1 // chance of tree reload after save
deleteChance = 0.2 // chance of random version deletion after save
revertChance = 0.05 // chance to revert tree to random version with LoadVersionForOverwriting
syncChance = 0.2 // chance of enabling sync writes on tree load
cacheChance = 0.4 // chance of enabling caching
cacheSizeMax = 256 // maximum size of cache (will be random from 1)

versionOps = 64 // number of operations (create/update/delete) per version
updateRatio = 0.4 // ratio of updates out of all operations
Expand Down Expand Up @@ -170,26 +170,28 @@ func testRandomOperations(t *testing.T, randSeed int64) {
// Revert tree to historical version if requested, deleting all subsequent versions.
if r.Float64() < revertChance {
versions := getMirrorVersions(diskMirrors, memMirrors)
target := int64(versions[r.Intn(len(versions)-1)])
t.Logf("Reverting to version %v", target)
_, err = tree.LoadVersionForOverwriting(target)
require.NoError(t, err, "Failed to revert to version %v", target)
if m, ok := diskMirrors[target]; ok {
mirror = copyMirror(m)
} else if m, ok := memMirrors[target]; ok {
mirror = copyMirror(m)
} else {
t.Fatalf("Mirror not found for revert target %v", target)
}
mirrorKeys = getMirrorKeys(mirror)
for v := range diskMirrors {
if v > target {
delete(diskMirrors, v)
if len(versions) > 1 {
version = int64(versions[r.Intn(len(versions)-1)])
t.Logf("Reverting to version %v", version)
_, err = tree.LoadVersionForOverwriting(version)
require.NoError(t, err, "Failed to revert to version %v", version)
if m, ok := diskMirrors[version]; ok {
mirror = copyMirror(m)
} else if m, ok := memMirrors[version]; ok {
mirror = copyMirror(m)
} else {
t.Fatalf("Mirror not found for revert target %v", version)
}
}
for v := range memMirrors {
if v > target {
delete(memMirrors, v)
mirrorKeys = getMirrorKeys(mirror)
for v := range diskMirrors {
if v > version {
delete(diskMirrors, v)
}
}
for v := range memMirrors {
if v > version {
delete(memMirrors, v)
}
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,17 +1438,20 @@ func TestLoadVersionForOverwritingCase2(t *testing.T) {
func TestLoadVersionForOverwritingCase3(t *testing.T) {
require := require.New(t)

tree, _ := NewMutableTreeWithOpts(db.NewMemDB(), 0, nil)
tree, err := NewMutableTreeWithOpts(db.NewMemDB(), 0, nil)
require.NoError(err)

for i := byte(0); i < 20; i++ {
tree.Set([]byte{i}, []byte{i})
}
tree.SaveVersion()
_, _, err = tree.SaveVersion()
require.NoError(err)

for i := byte(0); i < 20; i++ {
tree.Set([]byte{i}, []byte{i + 1})
}
tree.SaveVersion()
_, _, err = tree.SaveVersion()
require.NoError(err)

removedNodes := []*Node{}

Expand All @@ -1461,12 +1464,14 @@ func TestLoadVersionForOverwritingCase3(t *testing.T) {
for i := byte(0); i < 20; i++ {
tree.Remove([]byte{i})
}
tree.SaveVersion()
_, _, err = tree.SaveVersion()
require.NoError(err)

_, err := tree.LoadVersionForOverwriting(1)
_, err = tree.LoadVersionForOverwriting(1)
require.NoError(err)
for _, n := range removedNodes {
has, _ := tree.ndb.Has(n.hash)
has, err := tree.ndb.Has(n.hash)
require.NoError(err)
require.False(has, "LoadVersionForOverwriting should remove useless nodes")
}

Expand Down

0 comments on commit 5ef0d85

Please sign in to comment.