-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(store/v2): snapshot manager #18458
Changes from all commits
8ddbff9
2569a3f
c0dcf8f
0f462be
9f876dc
beb0ef4
c792bde
5c25952
c558a40
2c30672
07185be
d03866b
19d0cd2
0598995
6d728b1
5026749
fdf6419
91dd007
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package iavl | ||
|
||
import ( | ||
"errors" | ||
|
||
"github.com/cosmos/iavl" | ||
|
||
"cosmossdk.io/store/v2/commitment" | ||
snapshotstypes "cosmossdk.io/store/v2/snapshots/types" | ||
) | ||
|
||
// Exporter is a wrapper around iavl.Exporter. | ||
type Exporter struct { | ||
exporter *iavl.Exporter | ||
} | ||
|
||
// Next returns the next item in the exporter. | ||
func (e *Exporter) Next() (*snapshotstypes.SnapshotIAVLItem, error) { | ||
item, err := e.exporter.Next() | ||
if err != nil { | ||
if errors.Is(err, iavl.ErrorExportDone) { | ||
return nil, commitment.ErrorExportDone | ||
} | ||
return nil, err | ||
} | ||
|
||
return &snapshotstypes.SnapshotIAVLItem{ | ||
Key: item.Key, | ||
Value: item.Value, | ||
Version: item.Version, | ||
Height: int32(item.Height), | ||
}, nil | ||
} | ||
|
||
// Close closes the exporter. | ||
func (e *Exporter) Close() error { | ||
e.exporter.Close() | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,34 @@ | ||||||||||
package iavl | ||||||||||
|
||||||||||
import ( | ||||||||||
"github.com/cosmos/iavl" | ||||||||||
|
||||||||||
snapshotstypes "cosmossdk.io/store/v2/snapshots/types" | ||||||||||
) | ||||||||||
|
||||||||||
// Importer is a wrapper around iavl.Importer. | ||||||||||
type Importer struct { | ||||||||||
importer *iavl.Importer | ||||||||||
} | ||||||||||
|
||||||||||
// Add adds the given item to the importer. | ||||||||||
func (i *Importer) Add(item *snapshotstypes.SnapshotIAVLItem) error { | ||||||||||
return i.importer.Add(&iavl.ExportNode{ | ||||||||||
Key: item.Key, | ||||||||||
Value: item.Value, | ||||||||||
Version: item.Version, | ||||||||||
Height: int8(item.Height), | ||||||||||
}) | ||||||||||
} | ||||||||||
|
||||||||||
// Commit commits the importer. | ||||||||||
func (i *Importer) Commit() error { | ||||||||||
return i.importer.Commit() | ||||||||||
} | ||||||||||
|
||||||||||
// Close closes the importer. | ||||||||||
func (i *Importer) Close() error { | ||||||||||
i.importer.Close() | ||||||||||
|
||||||||||
return nil | ||||||||||
Comment on lines
+31
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. iavl importer returns nothing |
||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,34 @@ func (t *IavlTree) Prune(version uint64) error { | |
return t.tree.DeleteVersionsTo(int64(version)) | ||
} | ||
|
||
// Export exports the tree exporter at the given version. | ||
func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) { | ||
tree, err := t.tree.GetImmutable(int64(version)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
exporter, err := tree.Export() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we please call this
tac0turtle marked this conversation as resolved.
Show resolved
Hide resolved
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &Exporter{ | ||
exporter: exporter, | ||
}, nil | ||
} | ||
|
||
// Import imports the tree importer at the given version. | ||
func (t *IavlTree) Import(version uint64) (commitment.Importer, error) { | ||
importer, err := t.tree.Import(int64(version)) | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &Importer{ | ||
importer: importer, | ||
}, nil | ||
} | ||
|
||
// Close closes the iavl tree. | ||
func (t *IavlTree) Close() error { | ||
return nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,14 +3,22 @@ package commitment | |
import ( | ||
"errors" | ||
"fmt" | ||
"io" | ||
"math" | ||
|
||
protoio "github.com/cosmos/gogoproto/io" | ||
ics23 "github.com/cosmos/ics23/go" | ||
|
||
"cosmossdk.io/log" | ||
"cosmossdk.io/store/v2" | ||
"cosmossdk.io/store/v2/snapshots" | ||
snapshotstypes "cosmossdk.io/store/v2/snapshots/types" | ||
) | ||
|
||
var _ store.Committer = (*CommitStore)(nil) | ||
var ( | ||
_ store.Committer = (*CommitStore)(nil) | ||
_ snapshots.CommitSnapshotter = (*CommitStore)(nil) | ||
) | ||
|
||
// CommitStore is a wrapper around multiple Tree objects mapped by a unique store | ||
// key. Each store key reflects dedicated and unique usage within a module. A caller | ||
|
@@ -127,6 +135,146 @@ func (c *CommitStore) Prune(version uint64) (ferr error) { | |
return ferr | ||
} | ||
|
||
// Snapshot implements snapshotstypes.CommitSnapshotter. | ||
func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error { | ||
if version == 0 { | ||
return fmt.Errorf("the snapshot version must be greater than 0") | ||
} | ||
|
||
latestVersion, err := c.GetLatestVersion() | ||
if err != nil { | ||
return err | ||
} | ||
if version > latestVersion { | ||
return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion) | ||
} | ||
|
||
for storeKey, tree := range c.multiTrees { | ||
// TODO: check the parallelism of this loop | ||
if err := func() error { | ||
Comment on lines
+153
to
+154
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this in a function closure? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is for |
||
exporter, err := tree.Export(version) | ||
if err != nil { | ||
return fmt.Errorf("failed to export tree for version %d: %w", version, err) | ||
} | ||
defer exporter.Close() | ||
|
||
err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ | ||
Item: &snapshotstypes.SnapshotItem_Store{ | ||
Store: &snapshotstypes.SnapshotStoreItem{ | ||
Name: storeKey, | ||
}, | ||
}, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("failed to write store name: %w", err) | ||
} | ||
|
||
for { | ||
item, err := exporter.Next() | ||
if errors.Is(err, ErrorExportDone) { | ||
break | ||
} else if err != nil { | ||
return fmt.Errorf("failed to get the next export node: %w", err) | ||
} | ||
|
||
if err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ | ||
Item: &snapshotstypes.SnapshotItem_IAVL{ | ||
IAVL: item, | ||
}, | ||
}); err != nil { | ||
return fmt.Errorf("failed to write iavl node: %w", err) | ||
} | ||
} | ||
|
||
return nil | ||
}(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Restore implements snapshotstypes.CommitSnapshotter. | ||
func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshotstypes.SnapshotItem, error) { | ||
var ( | ||
importer Importer | ||
snapshotItem snapshotstypes.SnapshotItem | ||
storeKey string | ||
) | ||
|
||
loop: | ||
for { | ||
snapshotItem = snapshotstypes.SnapshotItem{} | ||
err := protoReader.ReadMsg(&snapshotItem) | ||
if errors.Is(err, io.EOF) { | ||
break | ||
} else if err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) | ||
} | ||
|
||
switch item := snapshotItem.Item.(type) { | ||
case *snapshotstypes.SnapshotItem_Store: | ||
if importer != nil { | ||
if err := importer.Commit(); err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) | ||
} | ||
importer.Close() | ||
} | ||
storeKey = item.Store.Name | ||
tree := c.multiTrees[storeKey] | ||
if tree == nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey) | ||
} | ||
importer, err = tree.Import(version) | ||
if err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err) | ||
} | ||
defer importer.Close() | ||
|
||
case *snapshotstypes.SnapshotItem_IAVL: | ||
if importer == nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") | ||
} | ||
node := item.IAVL | ||
if node.Height > int32(math.MaxInt8) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we have this restriction, is it something related to iavl? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, iavl requires There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, we should change this in iavlv2, its weird versions can go up to maxint64 but iavl will crash before then. Seems like there is a bit of a misstep in expectations between the two dependencies There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that in v2 of iavl we should remove this restriction. It makes code usage in iavl also awkward: one weekend many months ago before I found out about Matt's rewrite, I tried to tackle concurrency issues and data races in iavl using atomic.Int64 and it was awkward trying to use that in iavl. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is a just tree height, it can be up to maxUint8 (255) |
||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", | ||
item.IAVL.Height, math.MaxInt8) | ||
} | ||
// Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does | ||
// not allow nil keys nor nil values for leaf nodes, so we can always set them to empty. | ||
if node.Key == nil { | ||
node.Key = []byte{} | ||
} | ||
if node.Height == 0 { | ||
if node.Value == nil { | ||
node.Value = []byte{} | ||
} | ||
// If the node is a leaf node, it will be written to the storage. | ||
chStorage <- &store.KVPair{ | ||
Key: node.Key, | ||
Value: node.Value, | ||
StoreKey: storeKey, | ||
} | ||
} | ||
err := importer.Add(node) | ||
if err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err) | ||
} | ||
default: | ||
break loop | ||
} | ||
} | ||
|
||
if importer != nil { | ||
if err := importer.Commit(); err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) | ||
} | ||
} | ||
|
||
return snapshotItem, c.LoadVersion(version) | ||
} | ||
|
||
func (c *CommitStore) Close() (ferr error) { | ||
for _, tree := range c.multiTrees { | ||
if err := tree.Close(); err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
Close
method does not handle the error that might be returned byi.importer.Close()
. It is a best practice to handle errors when closing resources to ensure that any issues are properly logged or managed.Committable suggestion