From 8ddbff9a778f4945c0cf70a20e3211452bb3f62a Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Mon, 13 Nov 2023 12:36:29 -0500 Subject: [PATCH 01/10] snapshot refactoring --- store/commitment/iavl/tree.go | 148 ++++++++++++++++++++++++++- store/commitment/iavl/tree_test.go | 68 +++++++++++- store/pruning/manager_test.go | 2 +- store/root/store_test.go | 2 +- store/snapshots/helpers_test.go | 92 ++++++----------- store/snapshots/manager.go | 52 +++++++--- store/snapshots/manager_test.go | 38 +++---- store/snapshots/types/snapshotter.go | 36 +++---- store/storage/sqlite/db.go | 3 + 9 files changed, 313 insertions(+), 128 deletions(-) diff --git a/store/commitment/iavl/tree.go b/store/commitment/iavl/tree.go index 411eb708c1a4..01956717fe0b 100644 --- a/store/commitment/iavl/tree.go +++ b/store/commitment/iavl/tree.go @@ -2,27 +2,37 @@ package iavl import ( "fmt" + "io" + "math" dbm "github.com/cosmos/cosmos-db" + protoio "github.com/cosmos/gogoproto/io" "github.com/cosmos/iavl" ics23 "github.com/cosmos/ics23/go" log "cosmossdk.io/log" "cosmossdk.io/store/v2" + snapshottypes "cosmossdk.io/store/v2/snapshots/types" ) var _ store.Committer = (*IavlTree)(nil) +var _ snapshottypes.CommitSnapshotter = (*IavlTree)(nil) + // IavlTree is a wrapper around iavl.MutableTree. type IavlTree struct { tree *iavl.MutableTree + + // storeKey is the identifier of the store. + storeKey string } // NewIavlTree creates a new IavlTree instance. -func NewIavlTree(db dbm.DB, logger log.Logger, cfg *Config) *IavlTree { +func NewIavlTree(db dbm.DB, logger log.Logger, storeKey string, cfg *Config) *IavlTree { tree := iavl.NewMutableTree(db, cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger) return &IavlTree{ - tree: tree, + tree: tree, + storeKey: storeKey, } } @@ -83,6 +93,140 @@ func (t *IavlTree) Prune(version uint64) error { return t.tree.DeleteVersionsTo(int64(version)) } +// Snapshot implements snapshottypes.CommitSnapshotter. The snapshot output for a given format must be +// identical across nodes such that chunks from different sources fit together. +func (t *IavlTree) Snapshot(version uint64, protoWriter protoio.Writer) error { + if version == 0 { + return fmt.Errorf("the snapshot version must be greater than 0") + } + latestVersion := t.GetLatestVersion() + if version > latestVersion { + return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion) + } + + tree, err := t.tree.GetImmutable(int64(version)) + if err != nil { + return fmt.Errorf("failed to get immutable tree for version %d: %w", version, err) + } + + exporter, err := tree.Export() + if err != nil { + return fmt.Errorf("failed to export tree for version %d: %w", version, err) + } + + defer exporter.Close() + + err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_Store{ + Store: &snapshottypes.SnapshotStoreItem{ + Name: t.storeKey, + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to write store name: %w", err) + } + + for { + node, err := exporter.Next() + if err == iavl.ErrorExportDone { + break + } else if err != nil { + return fmt.Errorf("failed to get the next export node: %w", err) + } + if err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_IAVL{ + IAVL: &snapshottypes.SnapshotIAVLItem{ + Key: node.Key, + Value: node.Value, + Height: int32(node.Height), + Version: node.Version, + }, + }, + }); err != nil { + return fmt.Errorf("failed to write iavl node: %w", err) + } + } + + return nil +} + +// Restore implements snapshottypes.CommitSnapshotter. +// returns next snapshot item and error. +func (t *IavlTree) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshottypes.SnapshotItem, error) { + var importer *iavl.Importer + var snapshotItem snapshottypes.SnapshotItem +loop: + for { + snapshotItem = snapshottypes.SnapshotItem{} + err := protoReader.ReadMsg(&snapshotItem) + if err == io.EOF { + break + } else if err != nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) + } + + switch item := snapshotItem.Item.(type) { + case *snapshottypes.SnapshotItem_Store: + t.storeKey = item.Store.Name + importer, err = t.tree.Import(int64(version)) + if err != nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err) + } + defer importer.Close() + + case *snapshottypes.SnapshotItem_IAVL: + if importer == nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") + } + if item.IAVL.Height > math.MaxInt8 { + return snapshottypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", + item.IAVL.Height, math.MaxInt8) + } + node := &iavl.ExportNode{ + Key: item.IAVL.Key, + Value: item.IAVL.Value, + Height: int8(item.IAVL.Height), + Version: item.IAVL.Version, + } + // Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does + // not allow nil keys nor nil values for leaf nodes, so we can always set them to empty. + if node.Key == nil { + node.Key = []byte{} + } + if node.Height == 0 { + if node.Value == nil { + node.Value = []byte{} + } + // If the node is a leaf node, it will be written to the storage. + chStorage <- &store.KVPair{ + StoreKey: t.storeKey, + Key: node.Key, + Value: node.Value, + } + } + err := importer.Add(node) + if err != nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err) + } + default: + break loop + } + } + + if importer != nil { + err := importer.Commit() + if err != nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) + } + importer.Close() + } + + _, err := t.tree.LoadVersion(int64(version)) + + return snapshotItem, err +} + // Close closes the iavl tree. func (t *IavlTree) Close() error { return nil diff --git a/store/commitment/iavl/tree_test.go b/store/commitment/iavl/tree_test.go index 794652393988..58e41dc8040b 100644 --- a/store/commitment/iavl/tree_test.go +++ b/store/commitment/iavl/tree_test.go @@ -1,6 +1,8 @@ package iavl import ( + "fmt" + "io" "testing" dbm "github.com/cosmos/cosmos-db" @@ -8,12 +10,14 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots" + snapshottypes "cosmossdk.io/store/v2/snapshots/types" ) -func generateTree(treeType string) *IavlTree { +func generateTree(storeKey string) *IavlTree { cfg := DefaultConfig() db := dbm.NewMemDB() - return NewIavlTree(db, log.NewNopLogger(), cfg) + return NewIavlTree(db, log.NewNopLogger(), storeKey, cfg) } func TestIavlTree(t *testing.T) { @@ -90,3 +94,63 @@ func TestIavlTree(t *testing.T) { // close the db require.NoError(t, tree.Close()) } + +func TestSnapshotter(t *testing.T) { + // generate a new tree + storeKey := "store" + tree := generateTree(storeKey) + require.NotNil(t, tree) + + latestVersion := uint64(10) + kvCount := 10 + for i := uint64(1); i <= latestVersion; i++ { + cs := store.NewChangeset() + for j := 0; j < kvCount; j++ { + key := []byte(fmt.Sprintf("key-%d-%d", i, j)) + value := []byte(fmt.Sprintf("value-%d-%d", i, j)) + cs.Add(key, value) + } + err := tree.WriteBatch(cs) + require.NoError(t, err) + + _, err = tree.Commit() + require.NoError(t, err) + } + + latestHash := tree.WorkingHash() + + // create a snapshot + dummyExtensionItem := snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_Extension{ + Extension: &snapshottypes.SnapshotExtensionMeta{ + Name: "test", + Format: 1, + }, + }, + } + target := generateTree("") + chunks := make(chan io.ReadCloser, kvCount*int(latestVersion)) + go func() { + streamWriter := snapshots.NewStreamWriter(chunks) + require.NotNil(t, streamWriter) + defer streamWriter.Close() + err := tree.Snapshot(latestVersion, streamWriter) + require.NoError(t, err) + // write an extension metadata + err = streamWriter.WriteMsg(&dummyExtensionItem) + require.NoError(t, err) + }() + + streamReader, err := snapshots.NewStreamReader(chunks) + chStorage := make(chan *store.KVPair, 100) + require.NoError(t, err) + nextItem, err := target.Restore(latestVersion, snapshottypes.CurrentFormat, streamReader, chStorage) + require.NoError(t, err) + require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension()) + + // check the store key + require.Equal(t, storeKey, target.storeKey) + + // check the restored tree hash + require.Equal(t, latestHash, target.WorkingHash()) +} diff --git a/store/pruning/manager_test.go b/store/pruning/manager_test.go index 10c5966241c3..816d963c4d12 100644 --- a/store/pruning/manager_test.go +++ b/store/pruning/manager_test.go @@ -31,7 +31,7 @@ func (s *PruningTestSuite) SetupTest() { ss, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) - sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) + sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, "", iavl.DefaultConfig()) s.manager = NewManager(noopLog, ss, sc) s.ss = ss diff --git a/store/root/store_test.go b/store/root/store_test.go index ba372f965e96..f5ec3fb5cc16 100644 --- a/store/root/store_test.go +++ b/store/root/store_test.go @@ -30,7 +30,7 @@ func (s *RootStoreTestSuite) SetupTest() { ss, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) - sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) + sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, defaultStoreKey, iavl.DefaultConfig()) rs, err := New(noopLog, 1, ss, sc) s.Require().NoError(err) diff --git a/store/snapshots/helpers_test.go b/store/snapshots/helpers_test.go index 7c6cf04bcd72..61cbb59022a6 100644 --- a/store/snapshots/helpers_test.go +++ b/store/snapshots/helpers_test.go @@ -17,6 +17,7 @@ import ( errorsmod "cosmossdk.io/errors" "cosmossdk.io/log" + "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/snapshots" snapshottypes "cosmossdk.io/store/v2/snapshots/types" ) @@ -105,14 +106,12 @@ func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]b return chunks } -type mockSnapshotter struct { - items [][]byte - prunedHeights map[int64]struct{} - snapshotInterval uint64 +type mockCommitSnapshotter struct { + items [][]byte } -func (m *mockSnapshotter) Restore( - height uint64, format uint32, protoReader protoio.Reader, +func (m *mockCommitSnapshotter) Restore( + height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair, ) (snapshottypes.SnapshotItem, error) { if format == 0 { return snapshottypes.SnapshotItem{}, snapshottypes.ErrUnknownFormat @@ -141,7 +140,7 @@ func (m *mockSnapshotter) Restore( return item, nil } -func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { +func (m *mockCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { for _, item := range m.items { if err := snapshottypes.WriteExtensionPayload(protoWriter, item); err != nil { return err @@ -150,68 +149,50 @@ func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) er return nil } -func (m *mockSnapshotter) SnapshotFormat() uint32 { +func (m *mockCommitSnapshotter) SnapshotFormat() uint32 { return snapshottypes.CurrentFormat } -func (m *mockSnapshotter) SupportedFormats() []uint32 { +func (m *mockCommitSnapshotter) SupportedFormats() []uint32 { return []uint32{snapshottypes.CurrentFormat} } -func (m *mockSnapshotter) PruneSnapshotHeight(height int64) { - m.prunedHeights[height] = struct{}{} -} - -func (m *mockSnapshotter) GetSnapshotInterval() uint64 { - return m.snapshotInterval -} +type mockStorageSnapshotter struct{} -func (m *mockSnapshotter) SetSnapshotInterval(snapshotInterval uint64) { - m.snapshotInterval = snapshotInterval +func (m *mockStorageSnapshotter) Restore(version uint64, chStorage <-chan *store.KVPair) error { + return nil } -type mockErrorSnapshotter struct{} +type mockErrorCommitSnapshotter struct{} -var _ snapshottypes.Snapshotter = (*mockErrorSnapshotter)(nil) +var _ snapshottypes.CommitSnapshotter = (*mockErrorCommitSnapshotter)(nil) -func (m *mockErrorSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { +func (m *mockErrorCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { return errors.New("mock snapshot error") } -func (m *mockErrorSnapshotter) Restore( - height uint64, format uint32, protoReader protoio.Reader, +func (m *mockErrorCommitSnapshotter) Restore( + height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair, ) (snapshottypes.SnapshotItem, error) { return snapshottypes.SnapshotItem{}, errors.New("mock restore error") } -func (m *mockErrorSnapshotter) SnapshotFormat() uint32 { +func (m *mockErrorCommitSnapshotter) SnapshotFormat() uint32 { return snapshottypes.CurrentFormat } -func (m *mockErrorSnapshotter) SupportedFormats() []uint32 { +func (m *mockErrorCommitSnapshotter) SupportedFormats() []uint32 { return []uint32{snapshottypes.CurrentFormat} } -func (m *mockErrorSnapshotter) PruneSnapshotHeight(height int64) { -} - -func (m *mockErrorSnapshotter) GetSnapshotInterval() uint64 { - return 0 -} - -func (m *mockErrorSnapshotter) SetSnapshotInterval(snapshotInterval uint64) { -} - // setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1. // The snapshot will complete when the returned closer is called. func setupBusyManager(t *testing.T) *snapshots.Manager { t.Helper() store, err := snapshots.NewStore(db.NewMemDB(), t.TempDir()) require.NoError(t, err) - hung := newHungSnapshotter() - hung.SetSnapshotInterval(opts.Interval) - mgr := snapshots.NewManager(store, opts, hung, nil, log.NewNopLogger()) - require.Equal(t, opts.Interval, hung.snapshotInterval) + hung := newHungCommitSnapshotter() + mgr := snapshots.NewManager(store, opts, hung, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) // Channel to ensure the test doesn't finish until the goroutine is done. // Without this, there are intermittent test failures about @@ -222,8 +203,6 @@ func setupBusyManager(t *testing.T) *snapshots.Manager { defer close(done) _, err := mgr.Create(1) require.NoError(t, err) - _, didPruneHeight := hung.prunedHeights[1] - require.True(t, didPruneHeight) }() time.Sleep(10 * time.Millisecond) @@ -236,39 +215,28 @@ func setupBusyManager(t *testing.T) *snapshots.Manager { return mgr } -// hungSnapshotter can be used to test operations in progress. Call close to end the snapshot. -type hungSnapshotter struct { - ch chan struct{} - prunedHeights map[int64]struct{} - snapshotInterval uint64 +// hungCommitSnapshotter can be used to test operations in progress. Call close to end the snapshot. +type hungCommitSnapshotter struct { + ch chan struct{} } -func newHungSnapshotter() *hungSnapshotter { - return &hungSnapshotter{ - ch: make(chan struct{}), - prunedHeights: make(map[int64]struct{}), +func newHungCommitSnapshotter() *hungCommitSnapshotter { + return &hungCommitSnapshotter{ + ch: make(chan struct{}), } } -func (m *hungSnapshotter) Close() { +func (m *hungCommitSnapshotter) Close() { close(m.ch) } -func (m *hungSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { +func (m *hungCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { <-m.ch return nil } -func (m *hungSnapshotter) PruneSnapshotHeight(height int64) { - m.prunedHeights[height] = struct{}{} -} - -func (m *hungSnapshotter) SetSnapshotInterval(snapshotInterval uint64) { - m.snapshotInterval = snapshotInterval -} - -func (m *hungSnapshotter) Restore( - height uint64, format uint32, protoReader protoio.Reader, +func (m *hungCommitSnapshotter) Restore( + height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair, ) (snapshottypes.SnapshotItem, error) { panic("not implemented") } diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 8dd1381e1e2b..4d081b6a5b89 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -35,9 +35,12 @@ type Manager struct { // store is the snapshot store where all completed snapshots are persisted. store *Store opts types.SnapshotOptions - // multistore is the store from which snapshots are taken. - multistore types.Snapshotter - logger log.Logger + // commitSnapshotter is the snapshotter for the commitment state. + commitSnapshotter types.CommitSnapshotter + // storageSnapshotter is the snapshotter for the storage state. + storageSnapshotter types.StorageSnapshotter + + logger log.Logger mtx sync.Mutex operation operation @@ -62,8 +65,9 @@ const ( opPrune operation = "prune" opRestore operation = "restore" - chunkBufferSize = 4 - chunkIDBufferSize = 1024 + chunkBufferSize = 4 + chunkIDBufferSize = 1024 + defaultStorageChannelBufferSize = 1024 snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit ) @@ -71,16 +75,17 @@ const ( var ErrOptsZeroSnapshotInterval = errors.New("snaphot-interval must not be 0") // NewManager creates a new manager. -func NewManager(store *Store, opts types.SnapshotOptions, multistore types.Snapshotter, extensions map[string]types.ExtensionSnapshotter, logger log.Logger) *Manager { +func NewManager(store *Store, opts types.SnapshotOptions, commitSnapshotter types.CommitSnapshotter, storageSnapshotter types.StorageSnapshotter, extensions map[string]types.ExtensionSnapshotter, logger log.Logger) *Manager { if extensions == nil { extensions = map[string]types.ExtensionSnapshotter{} } return &Manager{ - store: store, - opts: opts, - multistore: multistore, - extensions: extensions, - logger: logger, + store: store, + opts: opts, + commitSnapshotter: commitSnapshotter, + storageSnapshotter: storageSnapshotter, + extensions: extensions, + logger: logger, } } @@ -164,8 +169,6 @@ func (m *Manager) Create(height uint64) (*types.Snapshot, error) { return nil, errorsmod.Wrap(store.ErrLogic, "no snapshot store configured") } - defer m.multistore.PruneSnapshotHeight(int64(height)) - err := m.begin(opSnapshot) if err != nil { return nil, err @@ -201,7 +204,7 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) { } }() - if err := m.multistore.Snapshot(height, streamWriter); err != nil { + if err := m.commitSnapshotter.Snapshot(height, streamWriter); err != nil { streamWriter.CloseWithError(err) return } @@ -363,7 +366,20 @@ func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io. return payload.Payload, nil } - nextItem, err = m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader) + // chStorage is the channel to pass the KV pairs to the storage snapshotter. + chStorage := make(chan *store.KVPair, defaultStorageChannelBufferSize) + defer close(chStorage) + + storageErrs := make(chan error, 1) + go func() { + err := m.storageSnapshotter.Restore(snapshot.Height, chStorage) + if err != nil { + storageErrs <- err + } + close(storageErrs) + }() + + nextItem, err = m.commitSnapshotter.Restore(snapshot.Height, snapshot.Format, streamReader, chStorage) if err != nil { return errorsmod.Wrap(err, "multistore restore") } @@ -393,6 +409,12 @@ func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io. return errorsmod.Wrapf(err, "extension %s don't exhausted payload stream", metadata.Name) } } + + // wait for storage snapshotter to complete + if err := <-storageErrs; err != nil { + return errorsmod.Wrap(err, "storage snapshotter") + } + return nil } diff --git a/store/snapshots/manager_test.go b/store/snapshots/manager_test.go index c3276d01ed7e..34861d5643d2 100644 --- a/store/snapshots/manager_test.go +++ b/store/snapshots/manager_test.go @@ -17,10 +17,9 @@ var opts = types.NewSnapshotOptions(1500, 2) func TestManager_List(t *testing.T) { store := setupStore(t) - snapshotter := &mockSnapshotter{} - snapshotter.SetSnapshotInterval(opts.Interval) - manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) - require.Equal(t, opts.Interval, snapshotter.GetSnapshotInterval()) + commitSnapshotter := &mockCommitSnapshotter{} + storageSnapshotter := &mockStorageSnapshotter{} + manager := snapshots.NewManager(store, opts, commitSnapshotter, storageSnapshotter, nil, log.NewNopLogger()) mgrList, err := manager.List() require.NoError(t, err) @@ -41,7 +40,7 @@ func TestManager_List(t *testing.T) { func TestManager_LoadChunk(t *testing.T) { store := setupStore(t) - manager := snapshots.NewManager(store, opts, &mockSnapshotter{}, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, &mockCommitSnapshotter{}, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) // Existing chunk should return body chunk, err := manager.LoadChunk(2, 1, 1) @@ -67,14 +66,13 @@ func TestManager_Take(t *testing.T) { {4, 5, 6}, {7, 8, 9}, } - snapshotter := &mockSnapshotter{ - items: items, - prunedHeights: make(map[int64]struct{}), + commitSnapshotter := &mockCommitSnapshotter{ + items: items, } extSnapshotter := newExtSnapshotter(10) expectChunks := snapshotItems(items, extSnapshotter) - manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, commitSnapshotter, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) err := manager.RegisterExtensions(extSnapshotter) require.NoError(t, err) @@ -85,18 +83,14 @@ func TestManager_Take(t *testing.T) { // creating a snapshot at a lower height than the latest should error _, err = manager.Create(3) require.Error(t, err) - _, didPruneHeight := snapshotter.prunedHeights[3] - require.True(t, didPruneHeight) // creating a snapshot at a higher height should be fine, and should return it snapshot, err := manager.Create(5) require.NoError(t, err) - _, didPruneHeight = snapshotter.prunedHeights[5] - require.True(t, didPruneHeight) assert.Equal(t, &types.Snapshot{ Height: 5, - Format: snapshotter.SnapshotFormat(), + Format: commitSnapshotter.SnapshotFormat(), Chunks: 1, Hash: []uint8{0xc5, 0xf7, 0xfe, 0xea, 0xd3, 0x4d, 0x3e, 0x87, 0xff, 0x41, 0xa2, 0x27, 0xfa, 0xcb, 0x38, 0x17, 0xa, 0x5, 0xeb, 0x27, 0x4e, 0x16, 0x5e, 0xf3, 0xb2, 0x8b, 0x47, 0xd1, 0xe6, 0x94, 0x7e, 0x8b}, Metadata: types.Metadata{ @@ -117,9 +111,7 @@ func TestManager_Take(t *testing.T) { func TestManager_Prune(t *testing.T) { store := setupStore(t) - snapshotter := &mockSnapshotter{} - snapshotter.SetSnapshotInterval(opts.Interval) - manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, &mockCommitSnapshotter{}, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) pruned, err := manager.Prune(2) require.NoError(t, err) @@ -137,11 +129,9 @@ func TestManager_Prune(t *testing.T) { func TestManager_Restore(t *testing.T) { store := setupStore(t) - target := &mockSnapshotter{ - prunedHeights: make(map[int64]struct{}), - } + target := &mockCommitSnapshotter{} extSnapshotter := newExtSnapshotter(0) - manager := snapshots.NewManager(store, opts, target, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, target, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) err := manager.RegisterExtensions(extSnapshotter) require.NoError(t, err) @@ -191,8 +181,6 @@ func TestManager_Restore(t *testing.T) { // While the restore is in progress, any other operations fail _, err = manager.Create(4) require.Error(t, err) - _, didPruneHeight := target.prunedHeights[4] - require.True(t, didPruneHeight) _, err = manager.Prune(1) require.Error(t, err) @@ -248,10 +236,10 @@ func TestManager_Restore(t *testing.T) { } func TestManager_TakeError(t *testing.T) { - snapshotter := &mockErrorSnapshotter{} + snapshotter := &mockErrorCommitSnapshotter{} store, err := snapshots.NewStore(db.NewMemDB(), GetTempDir(t)) require.NoError(t, err) - manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, snapshotter, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) _, err = manager.Create(1) require.Error(t, err) diff --git a/store/snapshots/types/snapshotter.go b/store/snapshots/types/snapshotter.go index de9fcfe3d3ff..ed64b8960e06 100644 --- a/store/snapshots/types/snapshotter.go +++ b/store/snapshots/types/snapshotter.go @@ -2,28 +2,24 @@ package types import ( protoio "github.com/cosmos/gogoproto/io" + + "cosmossdk.io/store/v2" ) -// Snapshotter is something that can create and restore snapshots, consisting of streamed binary -// chunks - all of which must be read from the channel and closed. If an unsupported format is -// given, it must return ErrUnknownFormat (possibly wrapped with fmt.Errorf). -type Snapshotter interface { - // Snapshot writes snapshot items into the protobuf writer. - Snapshot(height uint64, protoWriter protoio.Writer) error - - // PruneSnapshotHeight prunes the given height according to the prune strategy. - // If PruneNothing, this is a no-op. - // If other strategy, this height is persisted until it is - // less than - KeepRecent and % Interval == 0 - PruneSnapshotHeight(height int64) - - // SetSnapshotInterval sets the interval at which the snapshots are taken. - // It is used by the store that implements the Snapshotter interface - // to determine which heights to retain until after the snapshot is complete. - SetSnapshotInterval(snapshotInterval uint64) - - // Restore restores a state snapshot, taking the reader of protobuf message stream as input. - Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error) +// CommitSnapshotter defines an API for creating and restoring snapshots of the +// commitment state. +type CommitSnapshotter interface { + // Snapshot writes a snapshot of the commitment state at the given version. + Snapshot(version uint64, protoWriter protoio.Writer) error + + // Restore restores the commitment state from the snapshot reader. + Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (SnapshotItem, error) +} + +// StorageSnapshotter defines an API for restoring snapshots of the storage state. +type StorageSnapshotter interface { + // Restore restores the storage state from the given channel. + Restore(version uint64, chStorage <-chan *store.KVPair) error } // ExtensionPayloadReader read extension payloads, diff --git a/store/storage/sqlite/db.go b/store/storage/sqlite/db.go index 0886b023f808..8a4318f7de1f 100644 --- a/store/storage/sqlite/db.go +++ b/store/storage/sqlite/db.go @@ -11,6 +11,7 @@ import ( _ "modernc.org/sqlite" "cosmossdk.io/store/v2" + snapshottypes "cosmossdk.io/store/v2/snapshots/types" ) const ( @@ -41,6 +42,8 @@ const ( var _ store.VersionedDatabase = (*Database)(nil) +var _ snapshottypes.StorageSnapshotter = (*Database)(nil) + type Database struct { storage *sql.DB } From c0dcf8fedc1638fbb9b216c1697e63268d11663d Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Mon, 13 Nov 2023 12:52:52 -0500 Subject: [PATCH 02/10] storage snapshot api --- store/go.mod | 6 ++---- store/storage/sqlite/db.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/store/go.mod b/store/go.mod index a99b61f04b4c..64377e044592 100644 --- a/store/go.mod +++ b/store/go.mod @@ -13,8 +13,8 @@ require ( github.com/cosmos/gogoproto v1.4.11 github.com/cosmos/iavl v1.0.0 github.com/cosmos/ics23/go v0.10.0 - github.com/mattn/go-sqlite3 v1.14.17 github.com/linxGnu/grocksdb v1.8.5 + github.com/mattn/go-sqlite3 v1.14.17 github.com/stretchr/testify v1.8.4 github.com/tidwall/btree v1.7.0 golang.org/x/exp v0.0.0-20231006140011-7918f672742d @@ -56,13 +56,11 @@ require ( github.com/sasha-s/go-deadlock v0.3.1 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect - golang.org/x/sync v0.4.0 // indirect golang.org/x/crypto v0.15.0 // indirect - golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.17.0 // indirect + golang.org/x/sync v0.4.0 // indirect golang.org/x/sys v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect google.golang.org/grpc v1.59.0 // indirect google.golang.org/protobuf v1.31.0 // indirect diff --git a/store/storage/sqlite/db.go b/store/storage/sqlite/db.go index 5d7e9f5ebe16..502cecafd35b 100644 --- a/store/storage/sqlite/db.go +++ b/store/storage/sqlite/db.go @@ -20,6 +20,8 @@ const ( reservedStoreKey = "_RESERVED_" keyLatestHeight = "latest_height" + defaultBatchBufferSize = 100000 + latestVersionStmt = ` INSERT INTO state_storage(store_key, key, value, version) VALUES(?, ?, ?, ?) @@ -212,6 +214,40 @@ func (db *Database) ReverseIterator(storeKey string, version uint64, start, end return newIterator(db.storage, storeKey, version, start, end, true) } +// Restore implements the StorageSnapshotter interface. +func (db *Database) Restore(version uint64, chStorage <-chan *store.KVPair) error { + latestVersion, err := db.GetLatestVersion() + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + if version <= latestVersion { + return fmt.Errorf("the snapshot version %d is not greater than latest version %d", version, latestVersion) + } + + b, err := NewBatch(db.storage, version) + if err != nil { + return err + } + + for kvPair := range chStorage { + if err := b.Set(kvPair.StoreKey, kvPair.Key, kvPair.Value); err != nil { + return err + } + + if b.Size() > defaultBatchBufferSize { + if err := b.Write(); err != nil { + return err + } + } + } + + if err := b.Write(); err != nil { + return err + } + + return db.SetLatestVersion(version) +} + func (db *Database) PrintRowsDebug() { stmt, err := db.storage.Prepare("SELECT store_key, key, value, version, tombstone FROM state_storage") if err != nil { From 0f462bec53192228c3f1b6a3e0567217bb0939d1 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Mon, 13 Nov 2023 15:09:41 -0500 Subject: [PATCH 03/10] comments --- store/commitment/iavl/tree.go | 21 ++++++++++++--------- store/snapshots/manager.go | 4 ---- store/storage/sqlite/db.go | 7 +++++-- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/store/commitment/iavl/tree.go b/store/commitment/iavl/tree.go index 01956717fe0b..ecaf6b12c923 100644 --- a/store/commitment/iavl/tree.go +++ b/store/commitment/iavl/tree.go @@ -1,6 +1,7 @@ package iavl import ( + "errors" "fmt" "io" "math" @@ -93,12 +94,12 @@ func (t *IavlTree) Prune(version uint64) error { return t.tree.DeleteVersionsTo(int64(version)) } -// Snapshot implements snapshottypes.CommitSnapshotter. The snapshot output for a given format must be -// identical across nodes such that chunks from different sources fit together. +// Snapshot implements snapshottypes.CommitSnapshotter. func (t *IavlTree) Snapshot(version uint64, protoWriter protoio.Writer) error { if version == 0 { return fmt.Errorf("the snapshot version must be greater than 0") } + latestVersion := t.GetLatestVersion() if version > latestVersion { return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion) @@ -129,11 +130,12 @@ func (t *IavlTree) Snapshot(version uint64, protoWriter protoio.Writer) error { for { node, err := exporter.Next() - if err == iavl.ErrorExportDone { + if errors.Is(err, iavl.ErrorExportDone) { break } else if err != nil { return fmt.Errorf("failed to get the next export node: %w", err) } + if err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ Item: &snapshottypes.SnapshotItem_IAVL{ IAVL: &snapshottypes.SnapshotIAVLItem{ @@ -152,15 +154,17 @@ func (t *IavlTree) Snapshot(version uint64, protoWriter protoio.Writer) error { } // Restore implements snapshottypes.CommitSnapshotter. -// returns next snapshot item and error. func (t *IavlTree) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshottypes.SnapshotItem, error) { - var importer *iavl.Importer - var snapshotItem snapshottypes.SnapshotItem + var ( + importer *iavl.Importer + snapshotItem snapshottypes.SnapshotItem + ) + loop: for { snapshotItem = snapshottypes.SnapshotItem{} err := protoReader.ReadMsg(&snapshotItem) - if err == io.EOF { + if errors.Is(err, io.EOF) { break } else if err != nil { return snapshottypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) @@ -179,7 +183,7 @@ loop: if importer == nil { return snapshottypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") } - if item.IAVL.Height > math.MaxInt8 { + if item.IAVL.Height > int32(math.MaxInt8) { return snapshottypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", item.IAVL.Height, math.MaxInt8) } @@ -219,7 +223,6 @@ loop: if err != nil { return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) } - importer.Close() } _, err := t.tree.LoadVersion(int64(version)) diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 4d081b6a5b89..cd7ecded467c 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -165,10 +165,6 @@ func (m *Manager) GetSnapshotBlockRetentionHeights() int64 { // Create creates a snapshot and returns its metadata. func (m *Manager) Create(height uint64) (*types.Snapshot, error) { - if m == nil { - return nil, errorsmod.Wrap(store.ErrLogic, "no snapshot store configured") - } - err := m.begin(opSnapshot) if err != nil { return nil, err diff --git a/store/storage/sqlite/db.go b/store/storage/sqlite/db.go index 502cecafd35b..12e6d53d26cf 100644 --- a/store/storage/sqlite/db.go +++ b/store/storage/sqlite/db.go @@ -20,6 +20,7 @@ const ( reservedStoreKey = "_RESERVED_" keyLatestHeight = "latest_height" + // TODO: it is a random number, need to be tuned defaultBatchBufferSize = 100000 latestVersionStmt = ` @@ -241,8 +242,10 @@ func (db *Database) Restore(version uint64, chStorage <-chan *store.KVPair) erro } } - if err := b.Write(); err != nil { - return err + if b.Size() > 0 { + if err := b.Write(); err != nil { + return err + } } return db.SetLatestVersion(version) From 9f876dc81cd2e25d3107dbf9f7a5f5ae467c7cd4 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Mon, 13 Nov 2023 15:22:41 -0500 Subject: [PATCH 04/10] fix tests --- store/snapshots/manager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index cd7ecded467c..abdf545d4096 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -165,6 +165,10 @@ func (m *Manager) GetSnapshotBlockRetentionHeights() int64 { // Create creates a snapshot and returns its metadata. func (m *Manager) Create(height uint64) (*types.Snapshot, error) { + if m == nil { + return nil, errorsmod.Wrap(store.ErrLogic, "Snatshot Manager is nil") + } + err := m.begin(opSnapshot) if err != nil { return nil, err From beb0ef475a5f83585e846c18ebc11a7654558a1e Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Mon, 13 Nov 2023 15:46:43 -0500 Subject: [PATCH 05/10] make more reliable tests --- store/pruning/manager_test.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/store/pruning/manager_test.go b/store/pruning/manager_test.go index 102e6ffedd18..7ee383a43d8d 100644 --- a/store/pruning/manager_test.go +++ b/store/pruning/manager_test.go @@ -26,14 +26,12 @@ func TestPruningTestSuite(t *testing.T) { } func (s *PruningTestSuite) SetupTest() { - noopLog := log.NewNopLogger() - ss, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) - sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, "", iavl.DefaultConfig()) + sc := iavl.NewIavlTree(dbm.NewMemDB(), log.NewNopLogger(), "", iavl.DefaultConfig()) - s.manager = NewManager(noopLog, ss, sc) + s.manager = NewManager(log.NewTestLogger(s.T()), ss, sc) s.ss = ss s.sc = sc } @@ -49,12 +47,16 @@ func (s *PruningTestSuite) TestPruning() { s.manager.Start() latestVersion := uint64(100) + kvCount := 10 - // write 10 batches + // write batches for i := uint64(0); i < latestVersion; i++ { version := i + 1 cs := store.NewChangeset() - cs.Add([]byte("key"), []byte(fmt.Sprintf("value%d", version))) + for j := 0; j < kvCount; j++ { + cs.Add([]byte(fmt.Sprintf("key-%d", j)), []byte(fmt.Sprintf("value-%d-%d", version, j))) + } + err := s.sc.WriteBatch(cs) s.Require().NoError(err) _, err = s.sc.Commit() @@ -68,20 +70,20 @@ func (s *PruningTestSuite) TestPruning() { s.manager.Stop() // check the store for the version 96 - val, err := s.ss.Get("", latestVersion-4, []byte("key")) + val, err := s.ss.Get("", latestVersion-4, []byte("key-0")) s.Require().NoError(err) - s.Require().Equal([]byte("value96"), val) + s.Require().Equal([]byte("value-96-0"), val) // check the store for the version 50 val, err = s.ss.Get("", 50, []byte("key")) s.Require().NoError(err) s.Require().Nil(val) // check the commitment for the version 96 - proof, err := s.sc.GetProof(latestVersion-4, []byte("key")) + proof, err := s.sc.GetProof(latestVersion-4, []byte("key-0")) s.Require().NoError(err) s.Require().NotNil(proof.GetExist()) // check the commitment for the version 95 - proof, err = s.sc.GetProof(latestVersion-5, []byte("key")) + proof, err = s.sc.GetProof(latestVersion-5, []byte("key-0")) s.Require().Error(err) s.Require().Nil(proof) } From 5c2595222bd5f28a9846a22e04d30609480ba365 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Fri, 1 Dec 2023 15:16:30 -0500 Subject: [PATCH 06/10] refactor with commitment store --- store/changeset.go | 5 +- store/commitment/iavl/tree.go | 195 ++++++++++------------------- store/commitment/iavl/tree_test.go | 71 +---------- store/commitment/store.go | 138 ++++++++++++++++++++ store/commitment/store_test.go | 65 ++++++++++ store/commitment/tree.go | 23 ++++ store/pruning/manager_test.go | 1 - store/root/store_test.go | 2 +- 8 files changed, 296 insertions(+), 204 deletions(-) create mode 100644 store/commitment/store_test.go diff --git a/store/changeset.go b/store/changeset.go index b9ceaa50764f..6982344375af 100644 --- a/store/changeset.go +++ b/store/changeset.go @@ -4,8 +4,9 @@ package store // track writes. Deletion can be denoted by a nil value or explicitly by the // Delete field. type KVPair struct { - Key []byte - Value []byte + Key []byte + Value []byte + StoreKey string // Optional for snapshot restore } type KVPairs []KVPair diff --git a/store/commitment/iavl/tree.go b/store/commitment/iavl/tree.go index 37ee8af01857..755c0743d943 100644 --- a/store/commitment/iavl/tree.go +++ b/store/commitment/iavl/tree.go @@ -3,38 +3,28 @@ package iavl import ( "errors" "fmt" - "io" - "math" dbm "github.com/cosmos/cosmos-db" - protoio "github.com/cosmos/gogoproto/io" "github.com/cosmos/iavl" ics23 "github.com/cosmos/ics23/go" log "cosmossdk.io/log" - "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" - snapshottypes "cosmossdk.io/store/v2/snapshots/types" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) var _ commitment.Tree = (*IavlTree)(nil) -var _ snapshottypes.CommitSnapshotter = (*IavlTree)(nil) - // IavlTree is a wrapper around iavl.MutableTree. type IavlTree struct { tree *iavl.MutableTree - - // storeKey is the identifier of the store. - storeKey string } // NewIavlTree creates a new IavlTree instance. -func NewIavlTree(db dbm.DB, logger log.Logger, storeKey string, cfg *Config) *IavlTree { +func NewIavlTree(db dbm.DB, logger log.Logger, cfg *Config) *IavlTree { tree := iavl.NewMutableTree(db, cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger) return &IavlTree{ - tree: tree, - storeKey: storeKey, + tree: tree, } } @@ -89,143 +79,84 @@ func (t *IavlTree) Prune(version uint64) error { return t.tree.DeleteVersionsTo(int64(version)) } -// Snapshot implements snapshottypes.CommitSnapshotter. -func (t *IavlTree) Snapshot(version uint64, protoWriter protoio.Writer) error { - if version == 0 { - return fmt.Errorf("the snapshot version must be greater than 0") - } - - latestVersion := t.GetLatestVersion() - if version > latestVersion { - return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion) - } - +// Export exports the tree exporter at the given version. +func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) { tree, err := t.tree.GetImmutable(int64(version)) if err != nil { - return fmt.Errorf("failed to get immutable tree for version %d: %w", version, err) + return nil, err } - exporter, err := tree.Export() - if err != nil { - return fmt.Errorf("failed to export tree for version %d: %w", version, err) - } + return &Exporter{ + exporter: exporter, + }, err +} - defer exporter.Close() +// Import imports the tree importer at the given version. +func (t *IavlTree) Import(version uint64) (commitment.Importer, error) { + importer, err := t.tree.Import(int64(version)) + return &Importer{ + importer: importer, + }, err +} - err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ - Item: &snapshottypes.SnapshotItem_Store{ - Store: &snapshottypes.SnapshotStoreItem{ - Name: t.storeKey, - }, - }, - }) - if err != nil { - return fmt.Errorf("failed to write store name: %w", err) - } +// Close closes the iavl tree. +func (t *IavlTree) Close() error { + return nil +} - for { - node, err := exporter.Next() - if errors.Is(err, iavl.ErrorExportDone) { - break - } else if err != nil { - return fmt.Errorf("failed to get the next export node: %w", err) - } +// Exporter is a wrapper around iavl.Exporter. +type Exporter struct { + exporter *iavl.Exporter +} - if err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ - Item: &snapshottypes.SnapshotItem_IAVL{ - IAVL: &snapshottypes.SnapshotIAVLItem{ - Key: node.Key, - Value: node.Value, - Height: int32(node.Height), - Version: node.Version, - }, - }, - }); err != nil { - return fmt.Errorf("failed to write iavl node: %w", err) +// Next returns the next item in the exporter. +func (e *Exporter) Next() (*snapshotstypes.SnapshotIAVLItem, error) { + item, err := e.exporter.Next() + if err != nil { + if errors.Is(err, iavl.ErrorExportDone) { + return nil, commitment.ErrorExportDone } + return nil, err } - return nil + return &snapshotstypes.SnapshotIAVLItem{ + Key: item.Key, + Value: item.Value, + Version: item.Version, + Height: int32(item.Height), + }, nil } -// Restore implements snapshottypes.CommitSnapshotter. -func (t *IavlTree) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshottypes.SnapshotItem, error) { - var ( - importer *iavl.Importer - snapshotItem snapshottypes.SnapshotItem - ) - -loop: - for { - snapshotItem = snapshottypes.SnapshotItem{} - err := protoReader.ReadMsg(&snapshotItem) - if errors.Is(err, io.EOF) { - break - } else if err != nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) - } +// Close closes the exporter. +func (e *Exporter) Close() error { + e.exporter.Close() - switch item := snapshotItem.Item.(type) { - case *snapshottypes.SnapshotItem_Store: - t.storeKey = item.Store.Name - importer, err = t.tree.Import(int64(version)) - if err != nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err) - } - defer importer.Close() - - case *snapshottypes.SnapshotItem_IAVL: - if importer == nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") - } - if item.IAVL.Height > int32(math.MaxInt8) { - return snapshottypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", - item.IAVL.Height, math.MaxInt8) - } - node := &iavl.ExportNode{ - Key: item.IAVL.Key, - Value: item.IAVL.Value, - Height: int8(item.IAVL.Height), - Version: item.IAVL.Version, - } - // Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does - // not allow nil keys nor nil values for leaf nodes, so we can always set them to empty. - if node.Key == nil { - node.Key = []byte{} - } - if node.Height == 0 { - if node.Value == nil { - node.Value = []byte{} - } - // If the node is a leaf node, it will be written to the storage. - chStorage <- &store.KVPair{ - StoreKey: t.storeKey, - Key: node.Key, - Value: node.Value, - } - } - err := importer.Add(node) - if err != nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err) - } - default: - break loop - } - } + return nil +} - if importer != nil { - err := importer.Commit() - if err != nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) - } - } +// Importer is a wrapper around iavl.Importer. +type Importer struct { + importer *iavl.Importer +} - _, err := t.tree.LoadVersion(int64(version)) +// Add adds the given item to the importer. +func (i *Importer) Add(item *snapshotstypes.SnapshotIAVLItem) error { + return i.importer.Add(&iavl.ExportNode{ + Key: item.Key, + Value: item.Value, + Version: item.Version, + Height: int8(item.Height), + }) +} - return snapshotItem, err +// Commit commits the importer. +func (i *Importer) Commit() error { + return i.importer.Commit() } -// Close closes the iavl tree. -func (t *IavlTree) Close() error { +// Close closes the importer. +func (i *Importer) Close() error { + i.importer.Close() + return nil } diff --git a/store/commitment/iavl/tree_test.go b/store/commitment/iavl/tree_test.go index 60383bd45518..8c61291ba7aa 100644 --- a/store/commitment/iavl/tree_test.go +++ b/store/commitment/iavl/tree_test.go @@ -1,28 +1,23 @@ package iavl import ( - "fmt" - "io" "testing" dbm "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/require" "cosmossdk.io/log" - "cosmossdk.io/store/v2" - "cosmossdk.io/store/v2/snapshots" - snapshottypes "cosmossdk.io/store/v2/snapshots/types" ) -func generateTree(storeKey string) *IavlTree { +func generateTree() *IavlTree { cfg := DefaultConfig() db := dbm.NewMemDB() - return NewIavlTree(db, log.NewNopLogger(), storeKey, cfg) + return NewIavlTree(db, log.NewNopLogger(), cfg) } func TestIavlTree(t *testing.T) { // generate a new tree - tree := generateTree("iavl") + tree := generateTree() require.NotNil(t, tree) initVersion := tree.GetLatestVersion() @@ -85,63 +80,3 @@ func TestIavlTree(t *testing.T) { // close the db require.NoError(t, tree.Close()) } - -func TestSnapshotter(t *testing.T) { - // generate a new tree - storeKey := "store" - tree := generateTree(storeKey) - require.NotNil(t, tree) - - latestVersion := uint64(10) - kvCount := 10 - for i := uint64(1); i <= latestVersion; i++ { - cs := store.NewChangeset() - for j := 0; j < kvCount; j++ { - key := []byte(fmt.Sprintf("key-%d-%d", i, j)) - value := []byte(fmt.Sprintf("value-%d-%d", i, j)) - cs.Add(key, value) - } - err := tree.WriteBatch(cs) - require.NoError(t, err) - - _, err = tree.Commit() - require.NoError(t, err) - } - - latestHash := tree.WorkingHash() - - // create a snapshot - dummyExtensionItem := snapshottypes.SnapshotItem{ - Item: &snapshottypes.SnapshotItem_Extension{ - Extension: &snapshottypes.SnapshotExtensionMeta{ - Name: "test", - Format: 1, - }, - }, - } - target := generateTree("") - chunks := make(chan io.ReadCloser, kvCount*int(latestVersion)) - go func() { - streamWriter := snapshots.NewStreamWriter(chunks) - require.NotNil(t, streamWriter) - defer streamWriter.Close() - err := tree.Snapshot(latestVersion, streamWriter) - require.NoError(t, err) - // write an extension metadata - err = streamWriter.WriteMsg(&dummyExtensionItem) - require.NoError(t, err) - }() - - streamReader, err := snapshots.NewStreamReader(chunks) - chStorage := make(chan *store.KVPair, 100) - require.NoError(t, err) - nextItem, err := target.Restore(latestVersion, snapshottypes.CurrentFormat, streamReader, chStorage) - require.NoError(t, err) - require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension()) - - // check the store key - require.Equal(t, storeKey, target.storeKey) - - // check the restored tree hash - require.Equal(t, latestHash, target.WorkingHash()) -} diff --git a/store/commitment/store.go b/store/commitment/store.go index 32952ef58f89..b9f6fd5bca39 100644 --- a/store/commitment/store.go +++ b/store/commitment/store.go @@ -3,11 +3,15 @@ package commitment import ( "errors" "fmt" + "io" + "math" + protoio "github.com/cosmos/gogoproto/io" ics23 "github.com/cosmos/ics23/go" "cosmossdk.io/log" "cosmossdk.io/store/v2" + snapshottypes "cosmossdk.io/store/v2/snapshots/types" ) var _ store.Committer = (*CommitStore)(nil) @@ -127,6 +131,140 @@ func (c *CommitStore) Prune(version uint64) (ferr error) { return ferr } +// Snapshot implements snapshottypes.CommitSnapshotter. +func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error { + if version == 0 { + return fmt.Errorf("the snapshot version must be greater than 0") + } + + latestVersion, err := c.GetLatestVersion() + if err != nil { + return err + } + if version > latestVersion { + return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion) + } + + for storeKey, tree := range c.multiTrees { + exporter, err := tree.Export(version) + if err != nil { + return fmt.Errorf("failed to export tree for version %d: %w", version, err) + } + + defer exporter.Close() + + err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_Store{ + Store: &snapshottypes.SnapshotStoreItem{ + Name: storeKey, + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to write store name: %w", err) + } + + for { + item, err := exporter.Next() + if errors.Is(err, ErrorExportDone) { + break + } else if err != nil { + return fmt.Errorf("failed to get the next export node: %w", err) + } + + if err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_IAVL{ + IAVL: item, + }, + }); err != nil { + return fmt.Errorf("failed to write iavl node: %w", err) + } + } + } + + return nil +} + +// Restore implements snapshottypes.CommitSnapshotter. +func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshottypes.SnapshotItem, error) { + var ( + importer Importer + snapshotItem snapshottypes.SnapshotItem + storeKey string + ) + +loop: + for { + snapshotItem = snapshottypes.SnapshotItem{} + err := protoReader.ReadMsg(&snapshotItem) + if errors.Is(err, io.EOF) { + break + } else if err != nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) + } + + switch item := snapshotItem.Item.(type) { + case *snapshottypes.SnapshotItem_Store: + if importer != nil { + if err := importer.Commit(); err != nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) + } + importer.Close() + } + storeKey = item.Store.Name + tree := c.multiTrees[storeKey] + if tree == nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey) + } + importer, err = tree.Import(version) + if err != nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err) + } + defer importer.Close() + + case *snapshottypes.SnapshotItem_IAVL: + if importer == nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") + } + node := item.IAVL + if node.Height > int32(math.MaxInt8) { + return snapshottypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", + item.IAVL.Height, math.MaxInt8) + } + // Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does + // not allow nil keys nor nil values for leaf nodes, so we can always set them to empty. + if node.Key == nil { + node.Key = []byte{} + } + if node.Height == 0 { + if node.Value == nil { + node.Value = []byte{} + } + // If the node is a leaf node, it will be written to the storage. + chStorage <- &store.KVPair{ + Key: node.Key, + Value: node.Value, + StoreKey: storeKey, + } + } + err := importer.Add(node) + if err != nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err) + } + default: + break loop + } + } + + if importer != nil { + if err := importer.Commit(); err != nil { + return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) + } + } + + return snapshotItem, c.LoadVersion(version) +} + func (c *CommitStore) Close() (ferr error) { for _, tree := range c.multiTrees { if err := tree.Close(); err != nil { diff --git a/store/commitment/store_test.go b/store/commitment/store_test.go new file mode 100644 index 000000000000..f9e699d922f1 --- /dev/null +++ b/store/commitment/store_test.go @@ -0,0 +1,65 @@ +package commitment + +import ( + "testing" +) + +func TestSnapshotter(t *testing.T) { + // generate a new tree + // storeKey := "store" + // tree := generateTree(storeKey) + // require.NotNil(t, tree) + + // latestVersion := uint64(10) + // kvCount := 10 + // for i := uint64(1); i <= latestVersion; i++ { + // cs := store.NewChangeset() + // for j := 0; j < kvCount; j++ { + // key := []byte(fmt.Sprintf("key-%d-%d", i, j)) + // value := []byte(fmt.Sprintf("value-%d-%d", i, j)) + // cs.Add(key, value) + // } + // err := tree.WriteBatch(cs) + // require.NoError(t, err) + + // _, err = tree.Commit() + // require.NoError(t, err) + // } + + // latestHash := tree.WorkingHash() + + // // create a snapshot + // dummyExtensionItem := snapshottypes.SnapshotItem{ + // Item: &snapshottypes.SnapshotItem_Extension{ + // Extension: &snapshottypes.SnapshotExtensionMeta{ + // Name: "test", + // Format: 1, + // }, + // }, + // } + // target := generateTree("") + // chunks := make(chan io.ReadCloser, kvCount*int(latestVersion)) + // go func() { + // streamWriter := snapshots.NewStreamWriter(chunks) + // require.NotNil(t, streamWriter) + // defer streamWriter.Close() + // err := tree.Snapshot(latestVersion, streamWriter) + // require.NoError(t, err) + // // write an extension metadata + // err = streamWriter.WriteMsg(&dummyExtensionItem) + // require.NoError(t, err) + // }() + + // streamReader, err := snapshots.NewStreamReader(chunks) + // chStorage := make(chan *store.KVPair, 100) + // require.NoError(t, err) + // nextItem, err := target.Restore(latestVersion, snapshottypes.CurrentFormat, streamReader, chStorage) + // require.NoError(t, err) + // require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension()) + + // // check the store key + // require.Equal(t, storeKey, target.storeKey) + + // // check the restored tree hash + // require.Equal(t, latestHash, target.WorkingHash()) +} diff --git a/store/commitment/tree.go b/store/commitment/tree.go index b55c90c5fad1..86890e738e38 100644 --- a/store/commitment/tree.go +++ b/store/commitment/tree.go @@ -1,11 +1,17 @@ package commitment import ( + "errors" "io" ics23 "github.com/cosmos/ics23/go" + + snapshottypes "cosmossdk.io/store/v2/snapshots/types" ) +// ErrorExportDone is returned by Exporter.Next() when all items have been exported. +var ErrorExportDone = errors.New("export is complete") + // Tree is the interface that wraps the basic Tree methods. type Tree interface { Set(key, value []byte) error @@ -16,6 +22,23 @@ type Tree interface { Commit() ([]byte, error) GetProof(version uint64, key []byte) (*ics23.CommitmentProof, error) Prune(version uint64) error + Export(version uint64) (Exporter, error) + Import(version uint64) (Importer, error) + + io.Closer +} + +// Exporter is the interface that wraps the basic Export methods. +type Exporter interface { + Next() (*snapshottypes.SnapshotIAVLItem, error) + + io.Closer +} + +// Importer is the interface that wraps the basic Import methods. +type Importer interface { + Add(*snapshottypes.SnapshotIAVLItem) error + Commit() error io.Closer } diff --git a/store/pruning/manager_test.go b/store/pruning/manager_test.go index 7e4afa63de40..ed67dbfde39d 100644 --- a/store/pruning/manager_test.go +++ b/store/pruning/manager_test.go @@ -57,7 +57,6 @@ func (s *PruningTestSuite) TestPruning() { s.manager.Start() latestVersion := uint64(100) - kvCount := 10 // write batches for i := uint64(0); i < latestVersion; i++ { diff --git a/store/root/store_test.go b/store/root/store_test.go index 5d610b64c0ff..54c89c4e1f2f 100644 --- a/store/root/store_test.go +++ b/store/root/store_test.go @@ -32,7 +32,7 @@ func (s *RootStoreTestSuite) SetupTest() { s.Require().NoError(err) tree := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) - sc, err := commitment.NewCommitStore(map[string]commitment.Tree{"default": tree}, noopLog) + sc, err := commitment.NewCommitStore(map[string]commitment.Tree{defaultStoreKey: tree}, noopLog) s.Require().NoError(err) rs, err := New(noopLog, 1, ss, sc, nil) From c558a4073bc91a495faf5967f81601a3a41c334a Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Mon, 4 Dec 2023 09:11:25 -0500 Subject: [PATCH 07/10] storage abstraction layer --- store/commitment/store.go | 43 +++++------ store/commitment/store_test.go | 8 +- store/commitment/tree.go | 6 +- store/kv/branch/store_test.go | 10 ++- store/pruning/manager_test.go | 4 +- store/root/store_test.go | 4 +- store/snapshots/chunk.go | 8 +- store/snapshots/helpers_test.go | 50 ++++++------- store/snapshots/manager.go | 4 +- store/storage/database.go | 23 ++++++ store/storage/sqlite/db.go | 72 ++---------------- store/storage/store.go | 128 ++++++++++++++++++++++++++++++++ 12 files changed, 229 insertions(+), 131 deletions(-) create mode 100644 store/storage/database.go create mode 100644 store/storage/store.go diff --git a/store/commitment/store.go b/store/commitment/store.go index b9f6fd5bca39..54b9034d85a1 100644 --- a/store/commitment/store.go +++ b/store/commitment/store.go @@ -11,10 +11,11 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/v2" - snapshottypes "cosmossdk.io/store/v2/snapshots/types" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) var _ store.Committer = (*CommitStore)(nil) +var _ snapshotstypes.CommitSnapshotter = (*CommitStore)(nil) // CommitStore is a wrapper around multiple Tree objects mapped by a unique store // key. Each store key reflects dedicated and unique usage within a module. A caller @@ -131,7 +132,7 @@ func (c *CommitStore) Prune(version uint64) (ferr error) { return ferr } -// Snapshot implements snapshottypes.CommitSnapshotter. +// Snapshot implements snapshotstypes.CommitSnapshotter. func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error { if version == 0 { return fmt.Errorf("the snapshot version must be greater than 0") @@ -153,9 +154,9 @@ func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error defer exporter.Close() - err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ - Item: &snapshottypes.SnapshotItem_Store{ - Store: &snapshottypes.SnapshotStoreItem{ + err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_Store{ + Store: &snapshotstypes.SnapshotStoreItem{ Name: storeKey, }, }, @@ -172,8 +173,8 @@ func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error return fmt.Errorf("failed to get the next export node: %w", err) } - if err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ - Item: &snapshottypes.SnapshotItem_IAVL{ + if err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_IAVL{ IAVL: item, }, }); err != nil { @@ -185,50 +186,50 @@ func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error return nil } -// Restore implements snapshottypes.CommitSnapshotter. -func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshottypes.SnapshotItem, error) { +// Restore implements snapshotstypes.CommitSnapshotter. +func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshotstypes.SnapshotItem, error) { var ( importer Importer - snapshotItem snapshottypes.SnapshotItem + snapshotItem snapshotstypes.SnapshotItem storeKey string ) loop: for { - snapshotItem = snapshottypes.SnapshotItem{} + snapshotItem = snapshotstypes.SnapshotItem{} err := protoReader.ReadMsg(&snapshotItem) if errors.Is(err, io.EOF) { break } else if err != nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) + return snapshotstypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) } switch item := snapshotItem.Item.(type) { - case *snapshottypes.SnapshotItem_Store: + case *snapshotstypes.SnapshotItem_Store: if importer != nil { if err := importer.Commit(); err != nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) + return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) } importer.Close() } storeKey = item.Store.Name tree := c.multiTrees[storeKey] if tree == nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey) + return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey) } importer, err = tree.Import(version) if err != nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err) + return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err) } defer importer.Close() - case *snapshottypes.SnapshotItem_IAVL: + case *snapshotstypes.SnapshotItem_IAVL: if importer == nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") + return snapshotstypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") } node := item.IAVL if node.Height > int32(math.MaxInt8) { - return snapshottypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", + return snapshotstypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", item.IAVL.Height, math.MaxInt8) } // Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does @@ -249,7 +250,7 @@ loop: } err := importer.Add(node) if err != nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err) + return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err) } default: break loop @@ -258,7 +259,7 @@ loop: if importer != nil { if err := importer.Commit(); err != nil { - return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) + return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) } } diff --git a/store/commitment/store_test.go b/store/commitment/store_test.go index f9e699d922f1..fb65de845808 100644 --- a/store/commitment/store_test.go +++ b/store/commitment/store_test.go @@ -29,9 +29,9 @@ func TestSnapshotter(t *testing.T) { // latestHash := tree.WorkingHash() // // create a snapshot - // dummyExtensionItem := snapshottypes.SnapshotItem{ - // Item: &snapshottypes.SnapshotItem_Extension{ - // Extension: &snapshottypes.SnapshotExtensionMeta{ + // dummyExtensionItem := snapshotstypes.SnapshotItem{ + // Item: &snapshotstypes.SnapshotItem_Extension{ + // Extension: &snapshotstypes.SnapshotExtensionMeta{ // Name: "test", // Format: 1, // }, @@ -53,7 +53,7 @@ func TestSnapshotter(t *testing.T) { // streamReader, err := snapshots.NewStreamReader(chunks) // chStorage := make(chan *store.KVPair, 100) // require.NoError(t, err) - // nextItem, err := target.Restore(latestVersion, snapshottypes.CurrentFormat, streamReader, chStorage) + // nextItem, err := target.Restore(latestVersion, snapshotstypes.CurrentFormat, streamReader, chStorage) // require.NoError(t, err) // require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension()) diff --git a/store/commitment/tree.go b/store/commitment/tree.go index 86890e738e38..67acffdf3099 100644 --- a/store/commitment/tree.go +++ b/store/commitment/tree.go @@ -6,7 +6,7 @@ import ( ics23 "github.com/cosmos/ics23/go" - snapshottypes "cosmossdk.io/store/v2/snapshots/types" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) // ErrorExportDone is returned by Exporter.Next() when all items have been exported. @@ -30,14 +30,14 @@ type Tree interface { // Exporter is the interface that wraps the basic Export methods. type Exporter interface { - Next() (*snapshottypes.SnapshotIAVLItem, error) + Next() (*snapshotstypes.SnapshotIAVLItem, error) io.Closer } // Importer is the interface that wraps the basic Import methods. type Importer interface { - Add(*snapshottypes.SnapshotIAVLItem) error + Add(*snapshotstypes.SnapshotIAVLItem) error Commit() error io.Closer diff --git a/store/kv/branch/store_test.go b/store/kv/branch/store_test.go index 4353f280f65f..424e396e0b2e 100644 --- a/store/kv/branch/store_test.go +++ b/store/kv/branch/store_test.go @@ -8,6 +8,7 @@ import ( "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/kv/branch" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/sqlite" ) @@ -25,7 +26,8 @@ func TestStorageTestSuite(t *testing.T) { } func (s *StoreTestSuite) SetupTest() { - storage, err := sqlite.New(s.T().TempDir()) + sqliteDB, err := sqlite.New(s.T().TempDir()) + ss := storage.NewStorageStore(sqliteDB) s.Require().NoError(err) cs := store.NewChangeset(map[string]store.KVPairs{storeKey: {}}) @@ -36,12 +38,12 @@ func (s *StoreTestSuite) SetupTest() { cs.AddKVPair(storeKey, store.KVPair{Key: []byte(key), Value: []byte(val)}) } - s.Require().NoError(storage.ApplyChangeset(1, cs)) + s.Require().NoError(ss.ApplyChangeset(1, cs)) - kvStore, err := branch.New(storeKey, storage) + kvStore, err := branch.New(storeKey, ss) s.Require().NoError(err) - s.storage = storage + s.storage = ss s.kvStore = kvStore } diff --git a/store/pruning/manager_test.go b/store/pruning/manager_test.go index ed67dbfde39d..65e7fe1b09ec 100644 --- a/store/pruning/manager_test.go +++ b/store/pruning/manager_test.go @@ -11,6 +11,7 @@ import ( "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/commitment/iavl" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/sqlite" ) @@ -34,8 +35,9 @@ func (s *PruningTestSuite) SetupTest() { logger = log.NewTestLogger(s.T()) } - ss, err := sqlite.New(s.T().TempDir()) + sqliteDB, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) + ss := storage.NewStorageStore(sqliteDB) tree := iavl.NewIavlTree(dbm.NewMemDB(), log.NewNopLogger(), iavl.DefaultConfig()) sc, err := commitment.NewCommitStore(map[string]commitment.Tree{"default": tree}, logger) diff --git a/store/root/store_test.go b/store/root/store_test.go index 54c89c4e1f2f..edbbc6bdaf6e 100644 --- a/store/root/store_test.go +++ b/store/root/store_test.go @@ -12,6 +12,7 @@ import ( "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/commitment/iavl" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/sqlite" ) @@ -28,8 +29,9 @@ func TestStorageTestSuite(t *testing.T) { func (s *RootStoreTestSuite) SetupTest() { noopLog := log.NewNopLogger() - ss, err := sqlite.New(s.T().TempDir()) + sqliteDB, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) + ss := storage.NewStorageStore(sqliteDB) tree := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) sc, err := commitment.NewCommitStore(map[string]commitment.Tree{defaultStoreKey: tree}, noopLog) diff --git a/store/snapshots/chunk.go b/store/snapshots/chunk.go index c70fc074b0e0..874bf966871b 100644 --- a/store/snapshots/chunk.go +++ b/store/snapshots/chunk.go @@ -6,7 +6,7 @@ import ( "cosmossdk.io/errors" "cosmossdk.io/store/v2" - snapshottypes "cosmossdk.io/store/v2/snapshots/types" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) // ChunkWriter reads an input stream, splits it into fixed-size chunks, and writes them to a @@ -169,15 +169,15 @@ func DrainChunks(chunks <-chan io.ReadCloser) { // ValidRestoreHeight will check height is valid for snapshot restore or not func ValidRestoreHeight(format uint32, height uint64) error { - if format != snapshottypes.CurrentFormat { - return errors.Wrapf(snapshottypes.ErrUnknownFormat, "format %v", format) + if format != snapshotstypes.CurrentFormat { + return errors.Wrapf(snapshotstypes.ErrUnknownFormat, "format %v", format) } if height == 0 { return errors.Wrap(store.ErrLogic, "cannot restore snapshot at height 0") } if height > uint64(math.MaxInt64) { - return errors.Wrapf(snapshottypes.ErrInvalidMetadata, + return errors.Wrapf(snapshotstypes.ErrInvalidMetadata, "snapshot height %v cannot exceed %v", height, int64(math.MaxInt64)) } diff --git a/store/snapshots/helpers_test.go b/store/snapshots/helpers_test.go index 61cbb59022a6..68df5a1a5ed4 100644 --- a/store/snapshots/helpers_test.go +++ b/store/snapshots/helpers_test.go @@ -19,7 +19,7 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/snapshots" - snapshottypes "cosmossdk.io/store/v2/snapshots/types" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) func checksums(slice [][]byte) [][]byte { @@ -63,7 +63,7 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte { } // snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks. -func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]byte { +func snapshotItems(items [][]byte, ext snapshotstypes.ExtensionSnapshotter) [][]byte { // copy the same parameters from the code snapshotChunkSize := uint64(10e6) snapshotBufferSize := int(snapshotChunkSize) @@ -75,19 +75,19 @@ func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]b zWriter, _ := zlib.NewWriterLevel(bufWriter, 7) protoWriter := protoio.NewDelimitedWriter(zWriter) for _, item := range items { - _ = snapshottypes.WriteExtensionPayload(protoWriter, item) + _ = snapshotstypes.WriteExtensionPayload(protoWriter, item) } // write extension metadata - _ = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ - Item: &snapshottypes.SnapshotItem_Extension{ - Extension: &snapshottypes.SnapshotExtensionMeta{ + _ = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_Extension{ + Extension: &snapshotstypes.SnapshotExtensionMeta{ Name: ext.SnapshotName(), Format: ext.SnapshotFormat(), }, }, }) _ = ext.SnapshotExtension(0, func(payload []byte) error { - return snapshottypes.WriteExtensionPayload(protoWriter, payload) + return snapshotstypes.WriteExtensionPayload(protoWriter, payload) }) _ = protoWriter.Close() _ = bufWriter.Flush() @@ -112,15 +112,15 @@ type mockCommitSnapshotter struct { func (m *mockCommitSnapshotter) Restore( height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair, -) (snapshottypes.SnapshotItem, error) { +) (snapshotstypes.SnapshotItem, error) { if format == 0 { - return snapshottypes.SnapshotItem{}, snapshottypes.ErrUnknownFormat + return snapshotstypes.SnapshotItem{}, snapshotstypes.ErrUnknownFormat } if m.items != nil { - return snapshottypes.SnapshotItem{}, errors.New("already has contents") + return snapshotstypes.SnapshotItem{}, errors.New("already has contents") } - var item snapshottypes.SnapshotItem + var item snapshotstypes.SnapshotItem m.items = [][]byte{} for { item.Reset() @@ -128,7 +128,7 @@ func (m *mockCommitSnapshotter) Restore( if err == io.EOF { break } else if err != nil { - return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "invalid protobuf message") + return snapshotstypes.SnapshotItem{}, errorsmod.Wrap(err, "invalid protobuf message") } payload := item.GetExtensionPayload() if payload == nil { @@ -142,7 +142,7 @@ func (m *mockCommitSnapshotter) Restore( func (m *mockCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { for _, item := range m.items { - if err := snapshottypes.WriteExtensionPayload(protoWriter, item); err != nil { + if err := snapshotstypes.WriteExtensionPayload(protoWriter, item); err != nil { return err } } @@ -150,11 +150,11 @@ func (m *mockCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writ } func (m *mockCommitSnapshotter) SnapshotFormat() uint32 { - return snapshottypes.CurrentFormat + return snapshotstypes.CurrentFormat } func (m *mockCommitSnapshotter) SupportedFormats() []uint32 { - return []uint32{snapshottypes.CurrentFormat} + return []uint32{snapshotstypes.CurrentFormat} } type mockStorageSnapshotter struct{} @@ -165,7 +165,7 @@ func (m *mockStorageSnapshotter) Restore(version uint64, chStorage <-chan *store type mockErrorCommitSnapshotter struct{} -var _ snapshottypes.CommitSnapshotter = (*mockErrorCommitSnapshotter)(nil) +var _ snapshotstypes.CommitSnapshotter = (*mockErrorCommitSnapshotter)(nil) func (m *mockErrorCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { return errors.New("mock snapshot error") @@ -173,16 +173,16 @@ func (m *mockErrorCommitSnapshotter) Snapshot(height uint64, protoWriter protoio func (m *mockErrorCommitSnapshotter) Restore( height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair, -) (snapshottypes.SnapshotItem, error) { - return snapshottypes.SnapshotItem{}, errors.New("mock restore error") +) (snapshotstypes.SnapshotItem, error) { + return snapshotstypes.SnapshotItem{}, errors.New("mock restore error") } func (m *mockErrorCommitSnapshotter) SnapshotFormat() uint32 { - return snapshottypes.CurrentFormat + return snapshotstypes.CurrentFormat } func (m *mockErrorCommitSnapshotter) SupportedFormats() []uint32 { - return []uint32{snapshottypes.CurrentFormat} + return []uint32{snapshotstypes.CurrentFormat} } // setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1. @@ -237,7 +237,7 @@ func (m *hungCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writ func (m *hungCommitSnapshotter) Restore( height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair, -) (snapshottypes.SnapshotItem, error) { +) (snapshotstypes.SnapshotItem, error) { panic("not implemented") } @@ -267,16 +267,16 @@ func (s *extSnapshotter) SupportedFormats() []uint32 { return []uint32{1} } -func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshottypes.ExtensionPayloadWriter) error { +func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshotstypes.ExtensionPayloadWriter) error { for _, i := range s.state { - if err := payloadWriter(snapshottypes.Uint64ToBigEndian(i)); err != nil { + if err := payloadWriter(snapshotstypes.Uint64ToBigEndian(i)); err != nil { return err } } return nil } -func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshottypes.ExtensionPayloadReader) error { +func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshotstypes.ExtensionPayloadReader) error { for { payload, err := payloadReader() if err == io.EOF { @@ -284,7 +284,7 @@ func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadR } else if err != nil { return err } - s.state = append(s.state, snapshottypes.BigEndianToUint64(payload)) + s.state = append(s.state, snapshotstypes.BigEndianToUint64(payload)) } // finalize restoration return nil diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index abdf545d4096..7eab23cfc489 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -166,7 +166,7 @@ func (m *Manager) GetSnapshotBlockRetentionHeights() int64 { // Create creates a snapshot and returns its metadata. func (m *Manager) Create(height uint64) (*types.Snapshot, error) { if m == nil { - return nil, errorsmod.Wrap(store.ErrLogic, "Snatshot Manager is nil") + return nil, errorsmod.Wrap(store.ErrLogic, "Snapshot Manager is nil") } err := m.begin(opSnapshot) @@ -372,11 +372,11 @@ func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io. storageErrs := make(chan error, 1) go func() { + defer close(storageErrs) err := m.storageSnapshotter.Restore(snapshot.Height, chStorage) if err != nil { storageErrs <- err } - close(storageErrs) }() nextItem, err = m.commitSnapshotter.Restore(snapshot.Height, snapshot.Format, streamReader, chStorage) diff --git a/store/storage/database.go b/store/storage/database.go new file mode 100644 index 000000000000..ff777c2f563c --- /dev/null +++ b/store/storage/database.go @@ -0,0 +1,23 @@ +package storage + +import ( + "io" + + "cosmossdk.io/store/v2" +) + +// Database is an interface that wraps the storage database methods. +type Database interface { + NewBatch(version uint64) (store.Batch, error) + Has(storeKey string, version uint64, key []byte) (bool, error) + Get(storeKey string, version uint64, key []byte) ([]byte, error) + GetLatestVersion() (uint64, error) + SetLatestVersion(version uint64) error + + Iterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) + ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) + + Prune(version uint64) error + + io.Closer +} diff --git a/store/storage/sqlite/db.go b/store/storage/sqlite/db.go index 805d15a72809..05fde45b03fd 100644 --- a/store/storage/sqlite/db.go +++ b/store/storage/sqlite/db.go @@ -11,7 +11,7 @@ import ( _ "github.com/mattn/go-sqlite3" "cosmossdk.io/store/v2" - snapshottypes "cosmossdk.io/store/v2/snapshots/types" + "cosmossdk.io/store/v2/storage" ) const ( @@ -21,9 +21,6 @@ const ( keyLatestHeight = "latest_height" keyPruneHeight = "prune_height" - // TODO: it is a random number, need to be tuned - defaultBatchBufferSize = 100000 - reservedUpsertStmt = ` INSERT INTO state_storage(store_key, key, value, version) VALUES(?, ?, ?, ?) @@ -44,9 +41,7 @@ const ( ` ) -var _ store.VersionedDatabase = (*Database)(nil) - -var _ snapshottypes.StorageSnapshotter = (*Database)(nil) +var _ storage.Database = (*Database)(nil) type Database struct { storage *sql.DB @@ -97,6 +92,10 @@ func (db *Database) Close() error { return err } +func (db *Database) NewBatch(version uint64) (store.Batch, error) { + return NewBatch(db.storage, version) +} + func (db *Database) GetLatestVersion() (uint64, error) { stmt, err := db.storage.Prepare("SELECT value FROM state_storage WHERE store_key = ? AND key = ?") if err != nil { @@ -174,29 +173,6 @@ func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]by return nil, nil } -func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error { - b, err := NewBatch(db.storage, version) - if err != nil { - return err - } - - for storeKey, pairs := range cs.Pairs { - for _, kvPair := range pairs { - if kvPair.Value == nil { - if err := b.Delete(storeKey, kvPair.Key); err != nil { - return err - } - } else { - if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil { - return err - } - } - } - } - - return b.Write() -} - // Prune removes all versions of all keys that are <= the given version. It keeps // the latest (non-tombstoned) version of each key/value tuple to handle queries // above the prune version. This is analogous to RocksDB full_history_ts_low. @@ -262,42 +238,6 @@ func (db *Database) ReverseIterator(storeKey string, version uint64, start, end return newIterator(db, storeKey, version, start, end, true) } -// Restore implements the StorageSnapshotter interface. -func (db *Database) Restore(version uint64, chStorage <-chan *store.KVPair) error { - latestVersion, err := db.GetLatestVersion() - if err != nil { - return fmt.Errorf("failed to get latest version: %w", err) - } - if version <= latestVersion { - return fmt.Errorf("the snapshot version %d is not greater than latest version %d", version, latestVersion) - } - - b, err := NewBatch(db.storage, version) - if err != nil { - return err - } - - for kvPair := range chStorage { - if err := b.Set(kvPair.StoreKey, kvPair.Key, kvPair.Value); err != nil { - return err - } - - if b.Size() > defaultBatchBufferSize { - if err := b.Write(); err != nil { - return err - } - } - } - - if b.Size() > 0 { - if err := b.Write(); err != nil { - return err - } - } - - return db.SetLatestVersion(version) -} - func (db *Database) PrintRowsDebug() { stmt, err := db.storage.Prepare("SELECT store_key, key, value, version, tombstone FROM state_storage") if err != nil { diff --git a/store/storage/store.go b/store/storage/store.go new file mode 100644 index 000000000000..cf15bf923a3e --- /dev/null +++ b/store/storage/store.go @@ -0,0 +1,128 @@ +package storage + +import ( + "fmt" + + "cosmossdk.io/store/v2" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" +) + +const ( + // TODO: it is a random number, need to be tuned + defaultBatchBufferSize = 100000 +) + +var _ store.VersionedDatabase = (*StorageStore)(nil) +var _ snapshotstypes.StorageSnapshotter = (*StorageStore)(nil) + +// StorageStore is a wrapper around the store.VersionedDatabase interface. +type StorageStore struct { + db Database +} + +// NewStorageStore returns a reference to a new StorageStore. +func NewStorageStore(db Database) *StorageStore { + return &StorageStore{ + db: db, + } +} + +// Has returns true if the key exists in the store. +func (ss *StorageStore) Has(storeKey string, version uint64, key []byte) (bool, error) { + return ss.db.Has(storeKey, version, key) +} + +// Get returns the value associated with the given key. +func (ss *StorageStore) Get(storeKey string, version uint64, key []byte) ([]byte, error) { + return ss.db.Get(storeKey, version, key) +} + +// ApplyChangeset applies the given changeset to the storage. +func (ss *StorageStore) ApplyChangeset(version uint64, cs *store.Changeset) error { + b, err := ss.db.NewBatch(version) + if err != nil { + return err + } + + for storeKey, pairs := range cs.Pairs { + for _, kvPair := range pairs { + if kvPair.Value == nil { + if err := b.Delete(storeKey, kvPair.Key); err != nil { + return err + } + } else { + if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil { + return err + } + } + } + } + + return b.Write() +} + +// GetLatestVersion returns the latest version of the store. +func (ss *StorageStore) GetLatestVersion() (uint64, error) { + return ss.db.GetLatestVersion() +} + +// SetLatestVersion sets the latest version of the store. +func (ss *StorageStore) SetLatestVersion(version uint64) error { + return ss.db.SetLatestVersion(version) +} + +// Iterator returns an iterator over the specified domain and prefix. +func (ss *StorageStore) Iterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) { + return ss.db.Iterator(storeKey, version, start, end) +} + +// ReverseIterator returns an iterator over the specified domain and prefix in reverse. +func (ss *StorageStore) ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) { + return ss.db.ReverseIterator(storeKey, version, start, end) +} + +// Prune prunes the store up to the given version. +func (ss *StorageStore) Prune(version uint64) error { + return ss.db.Prune(version) +} + +// Restore restores the store from the given channel. +func (ss *StorageStore) Restore(version uint64, chStorage <-chan *store.KVPair) error { + latestVersion, err := ss.db.GetLatestVersion() + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + if version <= latestVersion { + return fmt.Errorf("the snapshot version %d is not greater than latest version %d", version, latestVersion) + } + + b, err := ss.db.NewBatch(version) + if err != nil { + return err + } + + for kvPair := range chStorage { + if err := b.Set(kvPair.StoreKey, kvPair.Key, kvPair.Value); err != nil { + return err + } + + if b.Size() > defaultBatchBufferSize { + if err := b.Write(); err != nil { + return err + } + } + } + + if b.Size() > 0 { + if err := b.Write(); err != nil { + return err + } + } + + return ss.db.SetLatestVersion(version) +} + +// Close closes the store. +func (ss *StorageStore) Close() error { + return ss.db.Close() +} From d03866bd393f6958d4b950464ac96a548aad5e5c Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Tue, 5 Dec 2023 16:02:36 -0500 Subject: [PATCH 08/10] test --- store/commitment/iavl/exporter.go | 40 +++++++ store/commitment/iavl/importer.go | 34 ++++++ store/commitment/iavl/tree.go | 69 ++---------- store/commitment/iavl/tree_test.go | 18 +++ store/commitment/store.go | 69 +++++++----- store/commitment/store_test.go | 65 ----------- store/commitment/store_test_suite.go | 121 +++++++++++++++++++++ store/snapshots/helpers_test.go | 8 +- store/snapshots/manager.go | 16 +-- store/snapshots/{types => }/snapshotter.go | 5 +- store/storage/pebbledb/db.go | 35 ++---- store/storage/pebbledb/db_test.go | 2 +- store/storage/rocksdb/db.go | 32 ++---- store/storage/rocksdb/db_test.go | 3 +- store/storage/sqlite/db_test.go | 24 ++-- store/storage/storage_bench_test.go | 16 ++- store/storage/store.go | 8 +- 17 files changed, 332 insertions(+), 233 deletions(-) create mode 100644 store/commitment/iavl/exporter.go create mode 100644 store/commitment/iavl/importer.go delete mode 100644 store/commitment/store_test.go create mode 100644 store/commitment/store_test_suite.go rename store/snapshots/{types => }/snapshotter.go (94%) diff --git a/store/commitment/iavl/exporter.go b/store/commitment/iavl/exporter.go new file mode 100644 index 000000000000..20f00d1a1722 --- /dev/null +++ b/store/commitment/iavl/exporter.go @@ -0,0 +1,40 @@ +package iavl + +import ( + "errors" + + "github.com/cosmos/iavl" + + "cosmossdk.io/store/v2/commitment" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" +) + +// Exporter is a wrapper around iavl.Exporter. +type Exporter struct { + exporter *iavl.Exporter +} + +// Next returns the next item in the exporter. +func (e *Exporter) Next() (*snapshotstypes.SnapshotIAVLItem, error) { + item, err := e.exporter.Next() + if err != nil { + if errors.Is(err, iavl.ErrorExportDone) { + return nil, commitment.ErrorExportDone + } + return nil, err + } + + return &snapshotstypes.SnapshotIAVLItem{ + Key: item.Key, + Value: item.Value, + Version: item.Version, + Height: int32(item.Height), + }, nil +} + +// Close closes the exporter. +func (e *Exporter) Close() error { + e.exporter.Close() + + return nil +} diff --git a/store/commitment/iavl/importer.go b/store/commitment/iavl/importer.go new file mode 100644 index 000000000000..6f1b0eedf21f --- /dev/null +++ b/store/commitment/iavl/importer.go @@ -0,0 +1,34 @@ +package iavl + +import ( + "github.com/cosmos/iavl" + + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" +) + +// Importer is a wrapper around iavl.Importer. +type Importer struct { + importer *iavl.Importer +} + +// Add adds the given item to the importer. +func (i *Importer) Add(item *snapshotstypes.SnapshotIAVLItem) error { + return i.importer.Add(&iavl.ExportNode{ + Key: item.Key, + Value: item.Value, + Version: item.Version, + Height: int8(item.Height), + }) +} + +// Commit commits the importer. +func (i *Importer) Commit() error { + return i.importer.Commit() +} + +// Close closes the importer. +func (i *Importer) Close() error { + i.importer.Close() + + return nil +} diff --git a/store/commitment/iavl/tree.go b/store/commitment/iavl/tree.go index 755c0743d943..419240b5b365 100644 --- a/store/commitment/iavl/tree.go +++ b/store/commitment/iavl/tree.go @@ -1,7 +1,6 @@ package iavl import ( - "errors" "fmt" dbm "github.com/cosmos/cosmos-db" @@ -10,7 +9,6 @@ import ( log "cosmossdk.io/log" "cosmossdk.io/store/v2/commitment" - snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) var _ commitment.Tree = (*IavlTree)(nil) @@ -86,77 +84,28 @@ func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) { return nil, err } exporter, err := tree.Export() + if err != nil { + return nil, err + } + return &Exporter{ exporter: exporter, - }, err + }, nil } // Import imports the tree importer at the given version. func (t *IavlTree) Import(version uint64) (commitment.Importer, error) { importer, err := t.tree.Import(int64(version)) - return &Importer{ - importer: importer, - }, err -} - -// Close closes the iavl tree. -func (t *IavlTree) Close() error { - return nil -} - -// Exporter is a wrapper around iavl.Exporter. -type Exporter struct { - exporter *iavl.Exporter -} - -// Next returns the next item in the exporter. -func (e *Exporter) Next() (*snapshotstypes.SnapshotIAVLItem, error) { - item, err := e.exporter.Next() if err != nil { - if errors.Is(err, iavl.ErrorExportDone) { - return nil, commitment.ErrorExportDone - } return nil, err } - return &snapshotstypes.SnapshotIAVLItem{ - Key: item.Key, - Value: item.Value, - Version: item.Version, - Height: int32(item.Height), + return &Importer{ + importer: importer, }, nil } -// Close closes the exporter. -func (e *Exporter) Close() error { - e.exporter.Close() - - return nil -} - -// Importer is a wrapper around iavl.Importer. -type Importer struct { - importer *iavl.Importer -} - -// Add adds the given item to the importer. -func (i *Importer) Add(item *snapshotstypes.SnapshotIAVLItem) error { - return i.importer.Add(&iavl.ExportNode{ - Key: item.Key, - Value: item.Value, - Version: item.Version, - Height: int8(item.Height), - }) -} - -// Commit commits the importer. -func (i *Importer) Commit() error { - return i.importer.Commit() -} - -// Close closes the importer. -func (i *Importer) Close() error { - i.importer.Close() - +// Close closes the iavl tree. +func (t *IavlTree) Close() error { return nil } diff --git a/store/commitment/iavl/tree_test.go b/store/commitment/iavl/tree_test.go index 8c61291ba7aa..a1ea79bcc7c8 100644 --- a/store/commitment/iavl/tree_test.go +++ b/store/commitment/iavl/tree_test.go @@ -5,10 +5,28 @@ import ( dbm "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "cosmossdk.io/log" + "cosmossdk.io/store/v2/commitment" ) +func TestCommitterSuite(t *testing.T) { + s := &commitment.CommitStoreTestSuite{ + NewStore: func(db dbm.DB, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) { + multiTrees := make(map[string]commitment.Tree) + cfg := DefaultConfig() + for _, storeKey := range storeKeys { + prefixDB := dbm.NewPrefixDB(db, []byte(storeKey)) + multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg) + } + return commitment.NewCommitStore(multiTrees, logger) + }, + } + + suite.Run(t, s) +} + func generateTree() *IavlTree { cfg := DefaultConfig() db := dbm.NewMemDB() diff --git a/store/commitment/store.go b/store/commitment/store.go index 54b9034d85a1..ad391b1df3a2 100644 --- a/store/commitment/store.go +++ b/store/commitment/store.go @@ -11,11 +11,14 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots" snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) -var _ store.Committer = (*CommitStore)(nil) -var _ snapshotstypes.CommitSnapshotter = (*CommitStore)(nil) +var ( + _ store.Committer = (*CommitStore)(nil) + _ snapshots.CommitSnapshotter = (*CommitStore)(nil) +) // CommitStore is a wrapper around multiple Tree objects mapped by a unique store // key. Each store key reflects dedicated and unique usage within a module. A caller @@ -147,39 +150,45 @@ func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error } for storeKey, tree := range c.multiTrees { - exporter, err := tree.Export(version) - if err != nil { - return fmt.Errorf("failed to export tree for version %d: %w", version, err) - } - - defer exporter.Close() + // TODO: check the parallelism of this loop + if err := func() error { + exporter, err := tree.Export(version) + if err != nil { + return fmt.Errorf("failed to export tree for version %d: %w", version, err) + } + defer exporter.Close() - err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ - Item: &snapshotstypes.SnapshotItem_Store{ - Store: &snapshotstypes.SnapshotStoreItem{ - Name: storeKey, + err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_Store{ + Store: &snapshotstypes.SnapshotStoreItem{ + Name: storeKey, + }, }, - }, - }) - if err != nil { - return fmt.Errorf("failed to write store name: %w", err) - } - - for { - item, err := exporter.Next() - if errors.Is(err, ErrorExportDone) { - break - } else if err != nil { - return fmt.Errorf("failed to get the next export node: %w", err) + }) + if err != nil { + return fmt.Errorf("failed to write store name: %w", err) } - if err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ - Item: &snapshotstypes.SnapshotItem_IAVL{ - IAVL: item, - }, - }); err != nil { - return fmt.Errorf("failed to write iavl node: %w", err) + for { + item, err := exporter.Next() + if errors.Is(err, ErrorExportDone) { + break + } else if err != nil { + return fmt.Errorf("failed to get the next export node: %w", err) + } + + if err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_IAVL{ + IAVL: item, + }, + }); err != nil { + return fmt.Errorf("failed to write iavl node: %w", err) + } } + + return nil + }(); err != nil { + return err } } diff --git a/store/commitment/store_test.go b/store/commitment/store_test.go deleted file mode 100644 index fb65de845808..000000000000 --- a/store/commitment/store_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package commitment - -import ( - "testing" -) - -func TestSnapshotter(t *testing.T) { - // generate a new tree - // storeKey := "store" - // tree := generateTree(storeKey) - // require.NotNil(t, tree) - - // latestVersion := uint64(10) - // kvCount := 10 - // for i := uint64(1); i <= latestVersion; i++ { - // cs := store.NewChangeset() - // for j := 0; j < kvCount; j++ { - // key := []byte(fmt.Sprintf("key-%d-%d", i, j)) - // value := []byte(fmt.Sprintf("value-%d-%d", i, j)) - // cs.Add(key, value) - // } - // err := tree.WriteBatch(cs) - // require.NoError(t, err) - - // _, err = tree.Commit() - // require.NoError(t, err) - // } - - // latestHash := tree.WorkingHash() - - // // create a snapshot - // dummyExtensionItem := snapshotstypes.SnapshotItem{ - // Item: &snapshotstypes.SnapshotItem_Extension{ - // Extension: &snapshotstypes.SnapshotExtensionMeta{ - // Name: "test", - // Format: 1, - // }, - // }, - // } - // target := generateTree("") - // chunks := make(chan io.ReadCloser, kvCount*int(latestVersion)) - // go func() { - // streamWriter := snapshots.NewStreamWriter(chunks) - // require.NotNil(t, streamWriter) - // defer streamWriter.Close() - // err := tree.Snapshot(latestVersion, streamWriter) - // require.NoError(t, err) - // // write an extension metadata - // err = streamWriter.WriteMsg(&dummyExtensionItem) - // require.NoError(t, err) - // }() - - // streamReader, err := snapshots.NewStreamReader(chunks) - // chStorage := make(chan *store.KVPair, 100) - // require.NoError(t, err) - // nextItem, err := target.Restore(latestVersion, snapshotstypes.CurrentFormat, streamReader, chStorage) - // require.NoError(t, err) - // require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension()) - - // // check the store key - // require.Equal(t, storeKey, target.storeKey) - - // // check the restored tree hash - // require.Equal(t, latestHash, target.WorkingHash()) -} diff --git a/store/commitment/store_test_suite.go b/store/commitment/store_test_suite.go new file mode 100644 index 000000000000..370958cb3f10 --- /dev/null +++ b/store/commitment/store_test_suite.go @@ -0,0 +1,121 @@ +package commitment + +import ( + "fmt" + "io" + "sync" + + dbm "github.com/cosmos/cosmos-db" + "github.com/stretchr/testify/suite" + + "cosmossdk.io/log" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" +) + +const ( + storeKey1 = "store1" + storeKey2 = "store2" +) + +// CommitStoreTestSuite is a test suite to be used for all tree backends. +type CommitStoreTestSuite struct { + suite.Suite + + NewStore func(db dbm.DB, storeKeys []string, logger log.Logger) (*CommitStore, error) +} + +func (s *CommitStoreTestSuite) TestSnapshotter() { + storeKeys := []string{storeKey1, storeKey2} + commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger()) + s.Require().NoError(err) + + latestVersion := uint64(10) + kvCount := 10 + for i := uint64(1); i <= latestVersion; i++ { + kvPairs := make(map[string]store.KVPairs) + for _, storeKey := range storeKeys { + kvPairs[storeKey] = store.KVPairs{} + for j := 0; j < kvCount; j++ { + key := []byte(fmt.Sprintf("key-%d-%d", i, j)) + value := []byte(fmt.Sprintf("value-%d-%d", i, j)) + kvPairs[storeKey] = append(kvPairs[storeKey], store.KVPair{Key: key, Value: value}) + } + } + s.Require().NoError(commitStore.WriteBatch(store.NewChangeset(kvPairs))) + + _, err = commitStore.Commit() + s.Require().NoError(err) + } + + latestStoreInfos := commitStore.WorkingStoreInfos(latestVersion) + s.Require().Equal(len(storeKeys), len(latestStoreInfos)) + + // create a snapshot + dummyExtensionItem := snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_Extension{ + Extension: &snapshotstypes.SnapshotExtensionMeta{ + Name: "test", + Format: 1, + }, + }, + } + + targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger()) + s.Require().NoError(err) + + chunks := make(chan io.ReadCloser, kvCount*int(latestVersion)) + go func() { + streamWriter := snapshots.NewStreamWriter(chunks) + s.Require().NotNil(streamWriter) + defer streamWriter.Close() + err := commitStore.Snapshot(latestVersion, streamWriter) + s.Require().NoError(err) + // write an extension metadata + err = streamWriter.WriteMsg(&dummyExtensionItem) + s.Require().NoError(err) + }() + + streamReader, err := snapshots.NewStreamReader(chunks) + s.Require().NoError(err) + chStorage := make(chan *store.KVPair, 100) + leaves := make(map[string]string) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for kv := range chStorage { + leaves[fmt.Sprintf("%s_%s", kv.StoreKey, kv.Key)] = string(kv.Value) + } + wg.Done() + }() + nextItem, err := targetStore.Restore(latestVersion, snapshotstypes.CurrentFormat, streamReader, chStorage) + s.Require().NoError(err) + s.Require().Equal(*dummyExtensionItem.GetExtension(), *nextItem.GetExtension()) + + close(chStorage) + wg.Wait() + s.Require().Equal(len(storeKeys)*kvCount*int(latestVersion), len(leaves)) + for _, storeKey := range storeKeys { + for i := 1; i <= int(latestVersion); i++ { + for j := 0; j < kvCount; j++ { + key := fmt.Sprintf("%s_key-%d-%d", storeKey, i, j) + s.Require().Equal(leaves[key], fmt.Sprintf("value-%d-%d", i, j)) + } + } + } + + // check the restored tree hash + targetStoreInfos := targetStore.WorkingStoreInfos(latestVersion) + s.Require().Equal(len(storeKeys), len(targetStoreInfos)) + for _, storeInfo := range targetStoreInfos { + matched := false + for _, latestStoreInfo := range latestStoreInfos { + if storeInfo.Name == latestStoreInfo.Name { + s.Require().Equal(latestStoreInfo.GetHash(), storeInfo.GetHash()) + matched = true + } + } + s.Require().True(matched) + } +} diff --git a/store/snapshots/helpers_test.go b/store/snapshots/helpers_test.go index 68df5a1a5ed4..9711ab538347 100644 --- a/store/snapshots/helpers_test.go +++ b/store/snapshots/helpers_test.go @@ -63,7 +63,7 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte { } // snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks. -func snapshotItems(items [][]byte, ext snapshotstypes.ExtensionSnapshotter) [][]byte { +func snapshotItems(items [][]byte, ext snapshots.ExtensionSnapshotter) [][]byte { // copy the same parameters from the code snapshotChunkSize := uint64(10e6) snapshotBufferSize := int(snapshotChunkSize) @@ -165,7 +165,7 @@ func (m *mockStorageSnapshotter) Restore(version uint64, chStorage <-chan *store type mockErrorCommitSnapshotter struct{} -var _ snapshotstypes.CommitSnapshotter = (*mockErrorCommitSnapshotter)(nil) +var _ snapshots.CommitSnapshotter = (*mockErrorCommitSnapshotter)(nil) func (m *mockErrorCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { return errors.New("mock snapshot error") @@ -267,7 +267,7 @@ func (s *extSnapshotter) SupportedFormats() []uint32 { return []uint32{1} } -func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshotstypes.ExtensionPayloadWriter) error { +func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshots.ExtensionPayloadWriter) error { for _, i := range s.state { if err := payloadWriter(snapshotstypes.Uint64ToBigEndian(i)); err != nil { return err @@ -276,7 +276,7 @@ func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshot return nil } -func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshotstypes.ExtensionPayloadReader) error { +func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshots.ExtensionPayloadReader) error { for { payload, err := payloadReader() if err == io.EOF { diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 7eab23cfc489..d55d487bb125 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -31,14 +31,14 @@ import ( // 2. io.ReadCloser streams automatically propagate IO errors, and can pass arbitrary // errors via io.Pipe.CloseWithError(). type Manager struct { - extensions map[string]types.ExtensionSnapshotter + extensions map[string]ExtensionSnapshotter // store is the snapshot store where all completed snapshots are persisted. store *Store opts types.SnapshotOptions // commitSnapshotter is the snapshotter for the commitment state. - commitSnapshotter types.CommitSnapshotter + commitSnapshotter CommitSnapshotter // storageSnapshotter is the snapshotter for the storage state. - storageSnapshotter types.StorageSnapshotter + storageSnapshotter StorageSnapshotter logger log.Logger @@ -75,9 +75,9 @@ const ( var ErrOptsZeroSnapshotInterval = errors.New("snaphot-interval must not be 0") // NewManager creates a new manager. -func NewManager(store *Store, opts types.SnapshotOptions, commitSnapshotter types.CommitSnapshotter, storageSnapshotter types.StorageSnapshotter, extensions map[string]types.ExtensionSnapshotter, logger log.Logger) *Manager { +func NewManager(store *Store, opts types.SnapshotOptions, commitSnapshotter CommitSnapshotter, storageSnapshotter StorageSnapshotter, extensions map[string]ExtensionSnapshotter, logger log.Logger) *Manager { if extensions == nil { - extensions = map[string]types.ExtensionSnapshotter{} + extensions = map[string]ExtensionSnapshotter{} } return &Manager{ store: store, @@ -90,9 +90,9 @@ func NewManager(store *Store, opts types.SnapshotOptions, commitSnapshotter type } // RegisterExtensions register extension snapshotters to manager -func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error { +func (m *Manager) RegisterExtensions(extensions ...ExtensionSnapshotter) error { if m.extensions == nil { - m.extensions = make(map[string]types.ExtensionSnapshotter, len(extensions)) + m.extensions = make(map[string]ExtensionSnapshotter, len(extensions)) } for _, extension := range extensions { name := extension.SnapshotName() @@ -517,7 +517,7 @@ func (m *Manager) sortedExtensionNames() []string { } // IsFormatSupported returns if the snapshotter supports restoration from given format. -func IsFormatSupported(snapshotter types.ExtensionSnapshotter, format uint32) bool { +func IsFormatSupported(snapshotter ExtensionSnapshotter, format uint32) bool { for _, i := range snapshotter.SupportedFormats() { if i == format { return true diff --git a/store/snapshots/types/snapshotter.go b/store/snapshots/snapshotter.go similarity index 94% rename from store/snapshots/types/snapshotter.go rename to store/snapshots/snapshotter.go index ed64b8960e06..7c8321f3c838 100644 --- a/store/snapshots/types/snapshotter.go +++ b/store/snapshots/snapshotter.go @@ -1,9 +1,10 @@ -package types +package snapshots import ( protoio "github.com/cosmos/gogoproto/io" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots/types" ) // CommitSnapshotter defines an API for creating and restoring snapshots of the @@ -13,7 +14,7 @@ type CommitSnapshotter interface { Snapshot(version uint64, protoWriter protoio.Writer) error // Restore restores the commitment state from the snapshot reader. - Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (SnapshotItem, error) + Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (types.SnapshotItem, error) } // StorageSnapshotter defines an API for restoring snapshots of the storage state. diff --git a/store/storage/pebbledb/db.go b/store/storage/pebbledb/db.go index 4e61857f9352..42b9a2812ca4 100644 --- a/store/storage/pebbledb/db.go +++ b/store/storage/pebbledb/db.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/pebble" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/storage" ) const ( @@ -25,7 +26,7 @@ const ( tombstoneVal = "TOMBSTONE" ) -var _ store.VersionedDatabase = (*Database)(nil) +var _ storage.Database = (*Database)(nil) type Database struct { storage *pebble.DB @@ -92,6 +93,15 @@ func (db *Database) Close() error { return err } +func (db *Database) NewBatch(version uint64) (store.Batch, error) { + b, err := NewBatch(db.storage, version, db.sync) + if err != nil { + return nil, err + } + + return b, nil +} + func (db *Database) SetLatestVersion(version uint64) error { var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], version) @@ -175,29 +185,6 @@ func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]by return nil, nil } -func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error { - b, err := NewBatch(db.storage, version, db.sync) - if err != nil { - return err - } - - for storeKey, pairs := range cs.Pairs { - for _, kvPair := range pairs { - if kvPair.Value == nil { - if err := b.Delete(storeKey, kvPair.Key); err != nil { - return err - } - } else { - if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil { - return err - } - } - } - } - - return b.Write() -} - // Prune removes all versions of all keys that are <= the given version. // // Note, the implementation of this method is inefficient and can be potentially diff --git a/store/storage/pebbledb/db_test.go b/store/storage/pebbledb/db_test.go index 934660042167..a8310386bc37 100644 --- a/store/storage/pebbledb/db_test.go +++ b/store/storage/pebbledb/db_test.go @@ -19,7 +19,7 @@ func TestStorageTestSuite(t *testing.T) { db.SetSync(false) } - return db, err + return storage.NewStorageStore(db), err }, EmptyBatchSize: 12, } diff --git a/store/storage/rocksdb/db.go b/store/storage/rocksdb/db.go index d73fd29be5ee..8ae5146d7b12 100644 --- a/store/storage/rocksdb/db.go +++ b/store/storage/rocksdb/db.go @@ -12,6 +12,7 @@ import ( "golang.org/x/exp/slices" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/util" ) @@ -23,7 +24,7 @@ const ( ) var ( - _ store.VersionedDatabase = (*Database)(nil) + _ storage.Database = (*Database)(nil) defaultWriteOpts = grocksdb.NewDefaultWriteOptions() defaultReadOpts = grocksdb.NewDefaultReadOptions() @@ -90,6 +91,15 @@ func (db *Database) Close() error { return nil } +func (db *Database) NewBatch(version uint64) (store.Batch, error) { + batch, err := NewBatch(db, version) + if err != nil { + return nil, err + } + + return batch, nil +} + func (db *Database) getSlice(storeKey string, version uint64, key []byte) (*grocksdb.Slice, error) { if version < db.tsLow { return nil, store.ErrVersionPruned{EarliestVersion: db.tsLow} @@ -141,26 +151,6 @@ func (db *Database) Get(storeKey string, version uint64, key []byte) ([]byte, er return copyAndFreeSlice(slice), nil } -func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error { - b := NewBatch(db, version) - - for storeKey, pairs := range cs.Pairs { - for _, kvPair := range pairs { - if kvPair.Value == nil { - if err := b.Delete(storeKey, kvPair.Key); err != nil { - return err - } - } else { - if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil { - return err - } - } - } - } - - return b.Write() -} - // Prune attempts to prune all versions up to and including the provided version. // This is done internally by updating the full_history_ts_low RocksDB value on // the column families, s.t. all versions less than full_history_ts_low will be diff --git a/store/storage/rocksdb/db_test.go b/store/storage/rocksdb/db_test.go index c1d2868cc68f..9810bd08bb20 100644 --- a/store/storage/rocksdb/db_test.go +++ b/store/storage/rocksdb/db_test.go @@ -21,7 +21,8 @@ const ( func TestStorageTestSuite(t *testing.T) { s := &storage.StorageTestSuite{ NewDB: func(dir string) (store.VersionedDatabase, error) { - return New(dir) + db, err := New(dir) + return storage.NewStorageStore(db), err }, EmptyBatchSize: 12, } diff --git a/store/storage/sqlite/db_test.go b/store/storage/sqlite/db_test.go index f80f88baf7e0..deff98ec4348 100644 --- a/store/storage/sqlite/db_test.go +++ b/store/storage/sqlite/db_test.go @@ -19,7 +19,8 @@ const ( func TestStorageTestSuite(t *testing.T) { s := &storage.StorageTestSuite{ NewDB: func(dir string) (store.VersionedDatabase, error) { - return New(dir) + db, err := New(dir) + return storage.NewStorageStore(db), err }, EmptyBatchSize: 0, } @@ -31,15 +32,16 @@ func TestDatabase_ReverseIterator(t *testing.T) { require.NoError(t, err) defer db.Close() - cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}}) + batch, err := db.NewBatch(1) + require.NoError(t, err) for i := 0; i < 100; i++ { key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099 val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099 - cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)}) + require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val))) } - require.NoError(t, db.ApplyChangeset(1, cs)) + require.NoError(t, batch.Write()) // reverse iterator without an end key iter, err := db.ReverseIterator(storeKey1, 1, []byte("key000"), nil) @@ -106,15 +108,16 @@ func TestParallelWrites(t *testing.T) { go func(i int) { <-triggerStartCh defer wg.Done() - cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}}) + batch, err := db.NewBatch(uint64(i + 1)) + require.NoError(t, err) for j := 0; j < kvCount; j++ { key := fmt.Sprintf("key-%d-%03d", i, j) val := fmt.Sprintf("val-%d-%03d", i, j) - cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)}) + require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val))) } - require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) + require.NoError(t, batch.Write()) }(i) } @@ -155,15 +158,16 @@ func TestParallelWriteAndPruning(t *testing.T) { <-triggerStartCh defer wg.Done() for i := 0; i < latestVersion; i++ { - cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}}) + batch, err := db.NewBatch(uint64(i + 1)) + require.NoError(t, err) for j := 0; j < kvCount; j++ { key := fmt.Sprintf("key-%d-%03d", i, j) val := fmt.Sprintf("val-%d-%03d", i, j) - cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)}) + require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val))) } - require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) + require.NoError(t, batch.Write()) } }() // start a goroutine that prunes the database diff --git a/store/storage/storage_bench_test.go b/store/storage/storage_bench_test.go index 08ba4c8093ac..5639d5d4e84f 100644 --- a/store/storage/storage_bench_test.go +++ b/store/storage/storage_bench_test.go @@ -1,7 +1,7 @@ //go:build rocksdb // +build rocksdb -package storage +package storage_test import ( "bytes" @@ -13,21 +13,29 @@ import ( "github.com/stretchr/testify/require" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/pebbledb" "cosmossdk.io/store/v2/storage/rocksdb" "cosmossdk.io/store/v2/storage/sqlite" ) +const ( + storeKey1 = "store1" +) + var ( backends = map[string]func(dataDir string) (store.VersionedDatabase, error){ "rocksdb_versiondb_opts": func(dataDir string) (store.VersionedDatabase, error) { - return rocksdb.New(dataDir) + db, err := rocksdb.New(dataDir) + return storage.NewStorageStore(db), err }, "pebbledb_default_opts": func(dataDir string) (store.VersionedDatabase, error) { - return pebbledb.New(dataDir) + db, err := pebbledb.New(dataDir) + return storage.NewStorageStore(db), err }, "btree_sqlite": func(dataDir string) (store.VersionedDatabase, error) { - return sqlite.New(dataDir) + db, err := sqlite.New(dataDir) + return storage.NewStorageStore(db), err }, } rng = rand.New(rand.NewSource(567320)) diff --git a/store/storage/store.go b/store/storage/store.go index cf15bf923a3e..3a6fed65db2d 100644 --- a/store/storage/store.go +++ b/store/storage/store.go @@ -4,7 +4,7 @@ import ( "fmt" "cosmossdk.io/store/v2" - snapshotstypes "cosmossdk.io/store/v2/snapshots/types" + "cosmossdk.io/store/v2/snapshots" ) const ( @@ -12,8 +12,10 @@ const ( defaultBatchBufferSize = 100000 ) -var _ store.VersionedDatabase = (*StorageStore)(nil) -var _ snapshotstypes.StorageSnapshotter = (*StorageStore)(nil) +var ( + _ store.VersionedDatabase = (*StorageStore)(nil) + _ snapshots.StorageSnapshotter = (*StorageStore)(nil) +) // StorageStore is a wrapper around the store.VersionedDatabase interface. type StorageStore struct { From 0598995961c184932a4addbf19f3cbf01bea772d Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Tue, 5 Dec 2023 16:14:01 -0500 Subject: [PATCH 09/10] rocksdb test --- store/storage/rocksdb/db.go | 7 +------ store/storage/rocksdb/db_test.go | 6 +++--- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/store/storage/rocksdb/db.go b/store/storage/rocksdb/db.go index 8ae5146d7b12..d1806cce1e2d 100644 --- a/store/storage/rocksdb/db.go +++ b/store/storage/rocksdb/db.go @@ -92,12 +92,7 @@ func (db *Database) Close() error { } func (db *Database) NewBatch(version uint64) (store.Batch, error) { - batch, err := NewBatch(db, version) - if err != nil { - return nil, err - } - - return batch, nil + return NewBatch(db, version), nil } func (db *Database) getSlice(storeKey string, version uint64, key []byte) (*grocksdb.Slice, error) { diff --git a/store/storage/rocksdb/db_test.go b/store/storage/rocksdb/db_test.go index 9810bd08bb20..8788bfe632e2 100644 --- a/store/storage/rocksdb/db_test.go +++ b/store/storage/rocksdb/db_test.go @@ -34,15 +34,15 @@ func TestDatabase_ReverseIterator(t *testing.T) { require.NoError(t, err) defer db.Close() - cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}}) + batch := NewBatch(db, 1) for i := 0; i < 100; i++ { key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099 val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099 - cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)}) + require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val))) } - require.NoError(t, db.ApplyChangeset(1, cs)) + require.NoError(t, batch.Write()) // reverse iterator without an end key iter, err := db.ReverseIterator(storeKey1, 1, []byte("key000"), nil) From 6d728b19a3c24760b740e32e580bad1b86cd2612 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Wed, 6 Dec 2023 11:44:20 -0500 Subject: [PATCH 10/10] comments --- store/snapshots/manager.go | 6 +++--- store/snapshots/manager_test.go | 2 +- store/snapshots/{types => }/options.go | 2 +- store/storage/database.go | 4 +++- 4 files changed, 8 insertions(+), 6 deletions(-) rename store/snapshots/{types => }/options.go (96%) diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index d55d487bb125..1c2d4ec65a31 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -34,7 +34,7 @@ type Manager struct { extensions map[string]ExtensionSnapshotter // store is the snapshot store where all completed snapshots are persisted. store *Store - opts types.SnapshotOptions + opts SnapshotOptions // commitSnapshotter is the snapshotter for the commitment state. commitSnapshotter CommitSnapshotter // storageSnapshotter is the snapshotter for the storage state. @@ -75,7 +75,7 @@ const ( var ErrOptsZeroSnapshotInterval = errors.New("snaphot-interval must not be 0") // NewManager creates a new manager. -func NewManager(store *Store, opts types.SnapshotOptions, commitSnapshotter CommitSnapshotter, storageSnapshotter StorageSnapshotter, extensions map[string]ExtensionSnapshotter, logger log.Logger) *Manager { +func NewManager(store *Store, opts SnapshotOptions, commitSnapshotter CommitSnapshotter, storageSnapshotter StorageSnapshotter, extensions map[string]ExtensionSnapshotter, logger log.Logger) *Manager { if extensions == nil { extensions = map[string]ExtensionSnapshotter{} } @@ -85,7 +85,7 @@ func NewManager(store *Store, opts types.SnapshotOptions, commitSnapshotter Comm commitSnapshotter: commitSnapshotter, storageSnapshotter: storageSnapshotter, extensions: extensions, - logger: logger, + logger: logger.With("module", "snapshot_manager"), } } diff --git a/store/snapshots/manager_test.go b/store/snapshots/manager_test.go index 34861d5643d2..af5b6eb1e130 100644 --- a/store/snapshots/manager_test.go +++ b/store/snapshots/manager_test.go @@ -13,7 +13,7 @@ import ( "cosmossdk.io/store/v2/snapshots/types" ) -var opts = types.NewSnapshotOptions(1500, 2) +var opts = snapshots.NewSnapshotOptions(1500, 2) func TestManager_List(t *testing.T) { store := setupStore(t) diff --git a/store/snapshots/types/options.go b/store/snapshots/options.go similarity index 96% rename from store/snapshots/types/options.go rename to store/snapshots/options.go index 9c6ec79a11e2..565a0ce105de 100644 --- a/store/snapshots/types/options.go +++ b/store/snapshots/options.go @@ -1,4 +1,4 @@ -package types +package snapshots // SnapshotOptions defines the snapshot strategy used when determining which // heights are snapshotted for state sync. diff --git a/store/storage/database.go b/store/storage/database.go index ff777c2f563c..884981f6138f 100644 --- a/store/storage/database.go +++ b/store/storage/database.go @@ -6,7 +6,9 @@ import ( "cosmossdk.io/store/v2" ) -// Database is an interface that wraps the storage database methods. +// Database is an interface that wraps the storage database methods. A wrapper +// is useful for instances where you want to perform logic that is identical for all SS +// backends, such as restoring snapshots. type Database interface { NewBatch(version uint64) (store.Batch, error) Has(storeKey string, version uint64, key []byte) (bool, error)