From f8ee15d1f47aaad8ad5c8a5539c9939cecc3f70d Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 29 Apr 2022 09:40:26 +0800 Subject: [PATCH] Make extension snapshotter interface safer to use Closes: #11824 Solution: - Use new methods `SnapshotExtension`/`RestoreExtension` to handle payload stream specifically. - Improve unit tests. --- CHANGELOG.md | 1 + docs/architecture/adr-049-state-sync-hooks.md | 20 ++++- snapshots/helpers_test.go | 77 +++++++++++++++++-- snapshots/manager.go | 40 ++++++++-- snapshots/manager_test.go | 18 +++-- snapshots/types/snapshotter.go | 19 ++++- snapshots/types/util.go | 6 +- 7 files changed, 152 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 674e57a761f1..9a01fd7509be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -159,6 +159,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (x/distribution)[\#11457](https://github.com/cosmos/cosmos-sdk/pull/11457) Add amount field to `distr.MsgWithdrawDelegatorRewardResponse` and `distr.MsgWithdrawValidatorCommissionResponse`. * (x/auth/middleware) [#11413](https://github.com/cosmos/cosmos-sdk/pull/11413) Refactor tx middleware to be extensible on tx fee logic. Merged `MempoolFeeMiddleware` and `TxPriorityMiddleware` functionalities into `DeductFeeMiddleware`, make the logic extensible using the `TxFeeChecker` option, the current fee logic is preserved by the default `checkTxFeeWithValidatorMinGasPrices` implementation. Change `RejectExtensionOptionsMiddleware` to `NewExtensionOptionsMiddleware` which is extensible with the `ExtensionOptionChecker` option. Unpack the tx extension options `Any`s to interface `TxExtensionOptionI`. * (migrations) [#1156](https://github.com/cosmos/cosmos-sdk/pull/11556#issuecomment-1091385011) Remove migration code from 0.42 and below. To use previous migrations, checkout previous versions of the cosmos-sdk. +* (store) [#]() Make extension snapshotter interface safer to use. ### Client Breaking Changes diff --git a/docs/architecture/adr-049-state-sync-hooks.md b/docs/architecture/adr-049-state-sync-hooks.md index e1616c2265ba..5cc2b684c4df 100644 --- a/docs/architecture/adr-049-state-sync-hooks.md +++ b/docs/architecture/adr-049-state-sync-hooks.md @@ -3,10 +3,11 @@ ## Changelog - Jan 19, 2022: Initial Draft +- Apr 29, 2022: Safer extension snapshotter interface ## Status -Draft, Under Implementation +Implemented ## Abstract @@ -107,11 +108,16 @@ func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) e On top of the existing `Snapshotter` interface for the `multistore`, we add `ExtensionSnapshotter` interface for the extension snapshotters. Three more function signatures: `SnapshotFormat()`, `SupportedFormats()` and `SnapshotName()` are added to `ExtensionSnapshotter`. ```go +// ExtensionPayloadReader read extension payloads, +// it returns io.EOF when reached either end of stream or the extension boundaries. +type ExtensionPayloadReader = func() ([]byte, error) + +// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream. +type ExtensionPayloadWriter = func([]byte) error + // ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream. // ExtensionSnapshotter has an unique name and manages it's own internal formats. type ExtensionSnapshotter interface { - Snapshotter - // SnapshotName returns the name of snapshotter, it should be unique in the manager. SnapshotName() string @@ -120,6 +126,14 @@ type ExtensionSnapshotter interface { // SupportedFormats returns a list of formats it can restore from. SupportedFormats() []uint32 + + // SnapshotExtension writes extension payloads into the underlying protobuf stream. + SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error + + // RestoreExtension restores an extension state snapshot, + // the payload reader returns `io.EOF` when reached the extension boundaries. + RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error + } ``` diff --git a/snapshots/helpers_test.go b/snapshots/helpers_test.go index 24051a17a927..2dc18102c3cf 100644 --- a/snapshots/helpers_test.go +++ b/snapshots/helpers_test.go @@ -18,6 +18,7 @@ import ( "github.com/cosmos/cosmos-sdk/snapshots" snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" "github.com/cosmos/cosmos-sdk/testutil" + sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) @@ -62,7 +63,7 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte { } // snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks. -func snapshotItems(items [][]byte) [][]byte { +func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]byte { // copy the same parameters from the code snapshotChunkSize := uint64(10e6) snapshotBufferSize := int(snapshotChunkSize) @@ -74,8 +75,20 @@ func snapshotItems(items [][]byte) [][]byte { zWriter, _ := zlib.NewWriterLevel(bufWriter, 7) protoWriter := protoio.NewDelimitedWriter(zWriter) for _, item := range items { - _ = snapshottypes.WriteExtensionItem(protoWriter, item) + _ = snapshottypes.WriteExtensionPayload(protoWriter, item) } + // write extension metadata + _ = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_Extension{ + Extension: &snapshottypes.SnapshotExtensionMeta{ + Name: ext.SnapshotName(), + Format: ext.SnapshotFormat(), + }, + }, + }) + _ = ext.SnapshotExtension(0, func(payload []byte) error { + return snapshottypes.WriteExtensionPayload(protoWriter, payload) + }) _ = protoWriter.Close() _ = zWriter.Close() _ = bufWriter.Flush() @@ -110,10 +123,11 @@ func (m *mockSnapshotter) Restore( return snapshottypes.SnapshotItem{}, errors.New("already has contents") } + var item snapshottypes.SnapshotItem m.items = [][]byte{} for { - item := &snapshottypes.SnapshotItem{} - err := protoReader.ReadMsg(item) + item.Reset() + err := protoReader.ReadMsg(&item) if err == io.EOF { break } else if err != nil { @@ -121,17 +135,17 @@ func (m *mockSnapshotter) Restore( } payload := item.GetExtensionPayload() if payload == nil { - return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message") + break } m.items = append(m.items, payload.Payload) } - return snapshottypes.SnapshotItem{}, nil + return item, nil } func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { for _, item := range m.items { - if err := snapshottypes.WriteExtensionItem(protoWriter, item); err != nil { + if err := snapshottypes.WriteExtensionPayload(protoWriter, item); err != nil { return err } } @@ -216,3 +230,52 @@ func (m *hungSnapshotter) Restore( ) (snapshottypes.SnapshotItem, error) { panic("not implemented") } + +type extSnapshotter struct { + state []uint64 +} + +func newExtSnapshotter(count int) *extSnapshotter { + state := make([]uint64, 0, count) + for i := 0; i < count; i++ { + state = append(state, uint64(i)) + } + return &extSnapshotter{ + state, + } +} + +func (s *extSnapshotter) SnapshotName() string { + return "mock" +} + +func (s *extSnapshotter) SnapshotFormat() uint32 { + return 1 +} + +func (s *extSnapshotter) SupportedFormats() []uint32 { + return []uint32{1} +} + +func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshottypes.ExtensionPayloadWriter) error { + for _, i := range s.state { + if err := payloadWriter(sdk.Uint64ToBigEndian(uint64(i))); err != nil { + return err + } + } + return nil +} + +func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshottypes.ExtensionPayloadReader) error { + for { + payload, err := payloadReader() + if err == io.EOF { + break + } else if err != nil { + return err + } + s.state = append(s.state, sdk.BigEndianToUint64(payload)) + } + // finalize restoration + return nil +} diff --git a/snapshots/manager.go b/snapshots/manager.go index 58986aab2716..1ef25471a561 100644 --- a/snapshots/manager.go +++ b/snapshots/manager.go @@ -83,6 +83,9 @@ func NewManager(store *Store, opts types.SnapshotOptions, multistore types.Snaps // RegisterExtensions register extension snapshotters to manager func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error { + if m.extensions == nil { + m.extensions = make(map[string]types.ExtensionSnapshotter, len(extensions)) + } for _, extension := range extensions { name := extension.SnapshotName() if _, ok := m.extensions[name]; ok { @@ -214,7 +217,10 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) { streamWriter.CloseWithError(err) return } - if err := extension.Snapshot(height, streamWriter); err != nil { + payloadWriter := func(payload []byte) error { + return types.WriteExtensionPayload(streamWriter, payload) + } + if err := extension.SnapshotExtension(height, payloadWriter); err != nil { streamWriter.CloseWithError(err) return } @@ -304,24 +310,40 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { // restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { + var nextItem types.SnapshotItem + streamReader, err := NewStreamReader(chChunks) if err != nil { return err } defer streamReader.Close() - next, err := m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader) + // payloadReader reads an extension payload for extension snapshotter, it returns `io.EOF` at extension boundaries. + payloadReader := func() ([]byte, error) { + nextItem.Reset() + if err := streamReader.ReadMsg(&nextItem); err != nil { + return nil, err + } + payload := nextItem.GetExtensionPayload() + if payload == nil { + return nil, io.EOF + } + return payload.Payload, nil + } + + nextItem, err = m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader) if err != nil { return sdkerrors.Wrap(err, "multistore restore") } + for { - if next.Item == nil { + if nextItem.Item == nil { // end of stream break } - metadata := next.GetExtension() + metadata := nextItem.GetExtension() if metadata == nil { - return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", next.Item) + return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", nextItem.Item) } extension, ok := m.extensions[metadata.Name] if !ok { @@ -330,10 +352,14 @@ func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.Re if !IsFormatSupported(extension, metadata.Format) { return sdkerrors.Wrapf(types.ErrUnknownFormat, "format %v for extension %s", metadata.Format, metadata.Name) } - next, err = extension.Restore(snapshot.Height, metadata.Format, streamReader) - if err != nil { + + if err := extension.RestoreExtension(snapshot.Height, metadata.Format, payloadReader); err != nil { return sdkerrors.Wrapf(err, "extension %s restore", metadata.Name) } + + if nextItem.GetExtensionPayload() != nil { + return sdkerrors.Wrapf(err, "extension %s don't exhausted payload stream", metadata.Name) + } } return nil } diff --git a/snapshots/manager_test.go b/snapshots/manager_test.go index 7fbddd6c7d6d..58a302e87ca8 100644 --- a/snapshots/manager_test.go +++ b/snapshots/manager_test.go @@ -68,11 +68,15 @@ func TestManager_Take(t *testing.T) { items: items, prunedHeights: make(map[int64]struct{}), } - expectChunks := snapshotItems(items) + extSnapshotter := newExtSnapshotter(10) + + expectChunks := snapshotItems(items, extSnapshotter) manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) + err := manager.RegisterExtensions(extSnapshotter) + require.NoError(t, err) // nil manager should return error - _, err := (*snapshots.Manager)(nil).Create(1) + _, err = (*snapshots.Manager)(nil).Create(1) require.Error(t, err) // creating a snapshot at a lower height than the latest should error @@ -91,7 +95,7 @@ func TestManager_Take(t *testing.T) { Height: 5, Format: snapshotter.SnapshotFormat(), Chunks: 1, - Hash: []uint8{0xcd, 0x17, 0x9e, 0x7f, 0x28, 0xb6, 0x82, 0x90, 0xc7, 0x25, 0xf3, 0x42, 0xac, 0x65, 0x73, 0x50, 0xaa, 0xa0, 0x10, 0x5c, 0x40, 0x8c, 0xd5, 0x1, 0xed, 0x82, 0xb5, 0xca, 0x8b, 0xe0, 0x83, 0xa2}, + Hash: []uint8{0x89, 0xfa, 0x18, 0xbc, 0x5a, 0xe3, 0xdc, 0x36, 0xa6, 0x95, 0x5, 0x17, 0xf9, 0x2, 0x1a, 0x55, 0x36, 0x16, 0x5d, 0x4b, 0x8b, 0x2b, 0x3d, 0xfd, 0xe, 0x2f, 0xb6, 0x40, 0x6b, 0xc3, 0xbc, 0x23}, Metadata: types.Metadata{ ChunkHashes: checksums(expectChunks), }, @@ -133,7 +137,10 @@ func TestManager_Restore(t *testing.T) { target := &mockSnapshotter{ prunedHeights: make(map[int64]struct{}), } + extSnapshotter := newExtSnapshotter(0) manager := snapshots.NewManager(store, opts, target, nil, log.NewNopLogger()) + err := manager.RegisterExtensions(extSnapshotter) + require.NoError(t, err) expectItems := [][]byte{ {1, 2, 3}, @@ -141,10 +148,10 @@ func TestManager_Restore(t *testing.T) { {7, 8, 9}, } - chunks := snapshotItems(expectItems) + chunks := snapshotItems(expectItems, newExtSnapshotter(10)) // Restore errors on invalid format - err := manager.Restore(types.Snapshot{ + err = manager.Restore(types.Snapshot{ Height: 3, Format: 0, Hash: []byte{1, 2, 3}, @@ -204,6 +211,7 @@ func TestManager_Restore(t *testing.T) { } assert.Equal(t, expectItems, target.items) + assert.Equal(t, 10, len(extSnapshotter.state)) // Starting a new restore should fail now, because the target already has contents. err = manager.Restore(types.Snapshot{ diff --git a/snapshots/types/snapshotter.go b/snapshots/types/snapshotter.go index 76f800484a49..94d1b4398284 100644 --- a/snapshots/types/snapshotter.go +++ b/snapshots/types/snapshotter.go @@ -22,16 +22,20 @@ type Snapshotter interface { // to determine which heights to retain until after the snapshot is complete. SetSnapshotInterval(snapshotInterval uint64) - // Restore restores a state snapshot, taking snapshot chunk readers as input. - // If the ready channel is non-nil, it returns a ready signal (by being closed) once the - // restorer is ready to accept chunks. + // Restore restores a state snapshot, taking the reader of protobuf message stream as input. Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error) } +// ExtensionPayloadReader read extension payloads, +// it returns io.EOF when reached either end of stream or the extension boundaries. +type ExtensionPayloadReader = func() ([]byte, error) + +// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream. +type ExtensionPayloadWriter = func([]byte) error + // ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream. // ExtensionSnapshotter has an unique name and manages it's own internal formats. type ExtensionSnapshotter interface { - Snapshotter // SnapshotName returns the name of snapshotter, it should be unique in the manager. SnapshotName() string @@ -43,4 +47,11 @@ type ExtensionSnapshotter interface { // SupportedFormats returns a list of formats it can restore from. SupportedFormats() []uint32 + + // SnapshotExtension writes extension payloads into the underlying protobuf stream. + SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error + + // RestoreExtension restores an extension state snapshot, + // the payload reader returns `io.EOF` when reached the extension boundaries. + RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error } diff --git a/snapshots/types/util.go b/snapshots/types/util.go index 125ea6fb4610..6ee39891e8d8 100644 --- a/snapshots/types/util.go +++ b/snapshots/types/util.go @@ -4,12 +4,12 @@ import ( protoio "github.com/gogo/protobuf/io" ) -// WriteExtensionItem writes an item payload for current extention snapshotter. -func WriteExtensionItem(protoWriter protoio.Writer, item []byte) error { +// WriteExtensionPayload writes an extension payload for current extention snapshotter. +func WriteExtensionPayload(protoWriter protoio.Writer, payload []byte) error { return protoWriter.WriteMsg(&SnapshotItem{ Item: &SnapshotItem_ExtensionPayload{ ExtensionPayload: &SnapshotExtensionPayload{ - Payload: item, + Payload: payload, }, }, })