From 4f59be0594c83b12ad18683bbeac79dd83e784d4 Mon Sep 17 00:00:00 2001 From: Yurii Oleksyshyn Date: Wed, 8 Jan 2025 12:45:56 +0200 Subject: [PATCH] Added database migration as part of model upgrade for dynamic protocol state. Added implementation for DKG end state migration between v1 and v2 --- state/protocol/protocol_state/kvstore.go | 2 +- .../protocol/protocol_state/kvstore/models.go | 59 +++++++++++++++---- .../protocol_state/kvstore/models_test.go | 18 +++--- .../protocol_state/mock/kv_store_api.go | 23 ++++++-- .../state/mutable_protocol_state_test.go | 12 ++-- .../protocol_state/state/protocol_state.go | 35 +++++++---- storage/badger/operation/dkg.go | 29 +++++++++ 7 files changed, 133 insertions(+), 45 deletions(-) diff --git a/state/protocol/protocol_state/kvstore.go b/state/protocol/protocol_state/kvstore.go index 7a8d784c752..dca36ab0f08 100644 --- a/state/protocol/protocol_state/kvstore.go +++ b/state/protocol/protocol_state/kvstore.go @@ -40,7 +40,7 @@ type KVStoreAPI interface { // Expected errors during normal operations: // - kvstore.ErrIncompatibleVersionChange if replicating the Parent Snapshot into a Snapshot // with the specified `protocolVersion` is not supported. - Replicate(protocolVersion uint64) (KVStoreMutator, error) + Replicate(protocolVersion uint64) (KVStoreMutator, transaction.DeferredDBUpdate, error) } // KVStoreMutator is the latest read-writer interface to the Protocol State key-value store. diff --git a/state/protocol/protocol_state/kvstore/models.go b/state/protocol/protocol_state/kvstore/models.go index 752a26b0d61..d363731b391 100644 --- a/state/protocol/protocol_state/kvstore/models.go +++ b/state/protocol/protocol_state/kvstore/models.go @@ -2,6 +2,8 @@ package kvstore import ( "fmt" + "github.com/onflow/flow-go/storage/badger/operation" + "github.com/onflow/flow-go/storage/badger/transaction" clone "github.com/huandu/go-clone/generic" //nolint:goimports @@ -74,14 +76,14 @@ func (model *Modelv0) ID() flow.Identifier { // Expected errors during normal operations: // - ErrIncompatibleVersionChange if replicating the Parent Snapshot into a Snapshot // with the specified `protocolVersion` is not supported. -func (model *Modelv0) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, error) { +func (model *Modelv0) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error) { currentVersion := model.GetProtocolStateVersion() if currentVersion == protocolVersion { // no need for migration, return a complete copy - return clone.Clone(model), nil + return clone.Clone(model), nil, nil } if protocolVersion != 1 { - return nil, fmt.Errorf("unsupported replication version %d, expect %d: %w", + return nil, nil, fmt.Errorf("unsupported replication version %d, expect %d: %w", protocolVersion, 1, ErrIncompatibleVersionChange) } @@ -90,9 +92,9 @@ func (model *Modelv0) Replicate(protocolVersion uint64) (protocol_state.KVStoreM Modelv0: clone.Clone(*model), } if v1.GetProtocolStateVersion() != protocolVersion { - return nil, fmt.Errorf("sanity check: replicate resulted in unexpected version (%d != %d)", v1.GetProtocolStateVersion(), protocolVersion) + return nil, nil, fmt.Errorf("sanity check: replicate resulted in unexpected version (%d != %d)", v1.GetProtocolStateVersion(), protocolVersion) } - return v1, nil + return v1, nil, nil } // VersionedEncode encodes the key-value store, returning the version separately @@ -170,16 +172,16 @@ func (model *Modelv1) ID() flow.Identifier { // Expected errors during normal operations: // - ErrIncompatibleVersionChange if replicating the Parent Snapshot into a Snapshot // with the specified `protocolVersion` is not supported. -func (model *Modelv1) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, error) { +func (model *Modelv1) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error) { currentVersion := model.GetProtocolStateVersion() if currentVersion == protocolVersion { // no need for migration, return a complete copy - return clone.Clone(model), nil + return clone.Clone(model), nil, nil } nextVersion := currentVersion + 1 if protocolVersion != nextVersion { // can only Replicate into model with numerically consecutive version - return nil, fmt.Errorf("unsupported replication version %d, expect %d: %w", + return nil, nil, fmt.Errorf("unsupported replication version %d, expect %d: %w", protocolVersion, 1, ErrIncompatibleVersionChange) } @@ -188,9 +190,40 @@ func (model *Modelv1) Replicate(protocolVersion uint64) (protocol_state.KVStoreM Modelv1: clone.Clone(*model), } if v2.GetProtocolStateVersion() != protocolVersion { - return nil, fmt.Errorf("sanity check: replicate resulted in unexpected version (%d != %d)", v2.GetProtocolStateVersion(), protocolVersion) + return nil, nil, fmt.Errorf("sanity check: replicate resulted in unexpected version (%d != %d)", v2.GetProtocolStateVersion(), protocolVersion) } - return v2, nil + + dbMigration := func(tx *transaction.Tx) error { + epochs, states, err := operation.RetrieveStoredEpochStates()(tx.DBTxn) + if err != nil { + return fmt.Errorf("could not retrieve stored epoch states: %w", err) + } + + convertState := func(state uint32) flow.DKGState { + switch state { + case 0: // DKGEndStateUnknown + return flow.DKGStateUninitialized + case 1: // DKGEndStateSuccess + return flow.RandomBeaconKeyCommitted + case 2, 3, 4: // DKGEndStateInconsistentKey, DKGEndStateNoKey, DKGEndStateDKGFailure + return flow.DKGStateFailure + default: + return flow.DKGStateUninitialized + } + } + + for i, epoch := range epochs { + from := states[i] + to := convertState(from) + err = operation.UpsertDKGStateForEpoch(epoch, to)(tx.DBTxn) + if err != nil { + return fmt.Errorf("could not upsert DKG state for epoch %d: %w", epoch, err) + } + } + return nil + } + + return v2, dbMigration, nil } // VersionedEncode encodes the key-value store, returning the version separately @@ -228,13 +261,13 @@ func (model *Modelv2) ID() flow.Identifier { // Expected errors during normal operations: // - ErrIncompatibleVersionChange if replicating the Parent Snapshot into a Snapshot // with the specified `protocolVersion` is not supported. -func (model *Modelv2) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, error) { +func (model *Modelv2) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error) { currentVersion := model.GetProtocolStateVersion() if currentVersion == protocolVersion { // no need for migration, return a complete copy - return clone.Clone(model), nil + return clone.Clone(model), nil, nil } else { - return nil, fmt.Errorf("unsupported replication version %d: %w", + return nil, nil, fmt.Errorf("unsupported replication version %d: %w", protocolVersion, ErrIncompatibleVersionChange) } } diff --git a/state/protocol/protocol_state/kvstore/models_test.go b/state/protocol/protocol_state/kvstore/models_test.go index f812182f9e6..f4d36af3a92 100644 --- a/state/protocol/protocol_state/kvstore/models_test.go +++ b/state/protocol/protocol_state/kvstore/models_test.go @@ -115,7 +115,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) { }, }, } - cpy, err := model.Replicate(model.GetProtocolStateVersion()) + cpy, _, err := model.Replicate(model.GetProtocolStateVersion()) require.NoError(t, err) require.True(t, reflect.DeepEqual(model, cpy)) // expect the same model require.Equal(t, cpy.ID(), model.ID()) @@ -132,7 +132,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) { }, }, } - newVersion, err := model.Replicate(1) + newVersion, _, err := model.Replicate(1) require.NoError(t, err) require.Equal(t, uint64(1), newVersion.GetProtocolStateVersion()) require.NotEqual(t, newVersion.ID(), model.ID(), "two models with the same data but different version must have different ID") @@ -142,7 +142,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) { }) t.Run("v0-invalid-upgrade", func(t *testing.T) { model := &kvstore.Modelv0{} - newVersion, err := model.Replicate(model.GetProtocolStateVersion() + 10) + newVersion, _, err := model.Replicate(model.GetProtocolStateVersion() + 10) require.ErrorIs(t, err, kvstore.ErrIncompatibleVersionChange) require.Nil(t, newVersion) }) @@ -158,7 +158,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) { EpochStateID: unittest.IdentifierFixture(), }, } - cpy, err := model.Replicate(model.GetProtocolStateVersion()) + cpy, _, err := model.Replicate(model.GetProtocolStateVersion()) require.NoError(t, err) require.True(t, reflect.DeepEqual(model, cpy)) @@ -171,7 +171,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) { model.GetProtocolStateVersion() - 1, model.GetProtocolStateVersion() + 10, } { - newVersion, err := model.Replicate(version) + newVersion, _, err := model.Replicate(version) require.ErrorIs(t, err, kvstore.ErrIncompatibleVersionChange) require.Nil(t, newVersion) } @@ -187,7 +187,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) { }, }, } - newVersion, err := model.Replicate(2) + newVersion, _, err := model.Replicate(2) require.NoError(t, err) require.Equal(t, uint64(2), newVersion.GetProtocolStateVersion()) require.NotEqual(t, newVersion.ID(), model.ID(), "two models with the same data but different version must have different ID") @@ -203,7 +203,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) { model.GetProtocolStateVersion() + 1, model.GetProtocolStateVersion() + 10, } { - newVersion, err := model.Replicate(version) + newVersion, _, err := model.Replicate(version) require.ErrorIs(t, err, kvstore.ErrIncompatibleVersionChange) require.Nil(t, newVersion) } @@ -268,7 +268,7 @@ func TestKVStoreMutator_SetEpochExtensionViewCount(t *testing.T) { t.Run("happy-path", func(t *testing.T) { store, err := kvstore.NewDefaultKVStore(safetyParams.FinalizationSafetyThreshold, safetyParams.EpochExtensionViewCount, epochStateID) require.NoError(t, err) - mutator, err := store.Replicate(store.GetProtocolStateVersion()) + mutator, _, err := store.Replicate(store.GetProtocolStateVersion()) require.NoError(t, err) newValue := safetyParams.FinalizationSafetyThreshold*2 + 1 @@ -280,7 +280,7 @@ func TestKVStoreMutator_SetEpochExtensionViewCount(t *testing.T) { t.Run("invalid-value", func(t *testing.T) { store, err := kvstore.NewDefaultKVStore(safetyParams.FinalizationSafetyThreshold, safetyParams.EpochExtensionViewCount, epochStateID) require.NoError(t, err) - mutator, err := store.Replicate(store.GetProtocolStateVersion()) + mutator, _, err := store.Replicate(store.GetProtocolStateVersion()) require.NoError(t, err) oldValue := mutator.GetEpochExtensionViewCount() diff --git a/state/protocol/protocol_state/mock/kv_store_api.go b/state/protocol/protocol_state/mock/kv_store_api.go index 010f8dfdd17..c5a7036f3d0 100644 --- a/state/protocol/protocol_state/mock/kv_store_api.go +++ b/state/protocol/protocol_state/mock/kv_store_api.go @@ -9,6 +9,8 @@ import ( protocol "github.com/onflow/flow-go/state/protocol" protocol_state "github.com/onflow/flow-go/state/protocol/protocol_state" + + transaction "github.com/onflow/flow-go/storage/badger/transaction" ) // KVStoreAPI is an autogenerated mock type for the KVStoreAPI type @@ -131,7 +133,7 @@ func (_m *KVStoreAPI) ID() flow.Identifier { } // Replicate provides a mock function with given fields: protocolVersion -func (_m *KVStoreAPI) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, error) { +func (_m *KVStoreAPI) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error) { ret := _m.Called(protocolVersion) if len(ret) == 0 { @@ -139,8 +141,9 @@ func (_m *KVStoreAPI) Replicate(protocolVersion uint64) (protocol_state.KVStoreM } var r0 protocol_state.KVStoreMutator - var r1 error - if rf, ok := ret.Get(0).(func(uint64) (protocol_state.KVStoreMutator, error)); ok { + var r1 transaction.DeferredDBUpdate + var r2 error + if rf, ok := ret.Get(0).(func(uint64) (protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error)); ok { return rf(protocolVersion) } if rf, ok := ret.Get(0).(func(uint64) protocol_state.KVStoreMutator); ok { @@ -151,13 +154,21 @@ func (_m *KVStoreAPI) Replicate(protocolVersion uint64) (protocol_state.KVStoreM } } - if rf, ok := ret.Get(1).(func(uint64) error); ok { + if rf, ok := ret.Get(1).(func(uint64) transaction.DeferredDBUpdate); ok { r1 = rf(protocolVersion) } else { - r1 = ret.Error(1) + if ret.Get(1) != nil { + r1 = ret.Get(1).(transaction.DeferredDBUpdate) + } } - return r0, r1 + if rf, ok := ret.Get(2).(func(uint64) error); ok { + r2 = rf(protocolVersion) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } // VersionedEncode provides a mock function with given fields: diff --git a/state/protocol/protocol_state/state/mutable_protocol_state_test.go b/state/protocol/protocol_state/state/mutable_protocol_state_test.go index ebb914d499b..0d805d5e491 100644 --- a/state/protocol/protocol_state/state/mutable_protocol_state_test.go +++ b/state/protocol/protocol_state/state/mutable_protocol_state_test.go @@ -65,7 +65,7 @@ func (s *StateMutatorSuite) SetupTest() { s.parentState.On("GetProtocolStateVersion").Return(s.latestProtocolVersion) s.parentState.On("GetVersionUpgrade").Return(nil) // no version upgrade by default s.parentState.On("ID").Return(unittest.IdentifierFixture(), nil) - s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil) + s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil, nil) // state replicated from the parent state; by default exactly the same as the parent state // CAUTION: ID of evolving state must be defined by the tests. @@ -274,7 +274,7 @@ func (s *StateMutatorSuite) Test_VersionUpgrade() { Data: newVersion, ActivationView: s.candidate.View + 1, }).Once() - s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil) + s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil, nil) s.evolvingState.On("ID").Return(parentStateID, nil).Once() for i := range s.kvStateMachines { @@ -295,7 +295,7 @@ func (s *StateMutatorSuite) Test_VersionUpgrade() { Data: newVersion, ActivationView: s.candidate.View, }).Once() - s.parentState.On("Replicate", newVersion).Return(&s.evolvingState, nil) + s.parentState.On("Replicate", newVersion).Return(&s.evolvingState, nil, nil) s.evolvingState.On("ID").Return(newStateID, nil).Once() for i := range s.kvStateMachines { @@ -318,7 +318,7 @@ func (s *StateMutatorSuite) Test_VersionUpgrade() { Data: newVersion, ActivationView: s.candidate.View - 1, }).Once() - s.parentState.On("Replicate", newVersion).Return(&s.evolvingState, nil) + s.parentState.On("Replicate", newVersion).Return(&s.evolvingState, nil, nil) s.evolvingState.On("ID").Return(newStateID, nil).Once() for i := range s.kvStateMachines { @@ -339,7 +339,7 @@ func (s *StateMutatorSuite) Test_VersionUpgrade() { Data: s.latestProtocolVersion, ActivationView: s.candidate.View - 1, }).Once() - s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil) + s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil, nil) s.evolvingState.On("ID").Return(parentStateID, nil).Once() for i := range s.kvStateMachines { @@ -414,7 +414,7 @@ func (s *StateMutatorSuite) Test_ReplicateFails() { s.parentState = *protocol_statemock.NewKVStoreAPI(s.T()) s.parentState.On("GetProtocolStateVersion").Return(s.latestProtocolVersion) s.parentState.On("GetVersionUpgrade").Return(nil).Once() - s.parentState.On("Replicate", s.latestProtocolVersion).Return(nil, exception).Once() + s.parentState.On("Replicate", s.latestProtocolVersion).Return(nil, nil, exception).Once() // `SetupTest` initializes the mock factories to expect to be called, so we overwrite the mocks here: s.kvStateMachineFactories[0] = *protocol_statemock.NewKeyValueStoreStateMachineFactory(s.T()) diff --git a/state/protocol/protocol_state/state/protocol_state.go b/state/protocol/protocol_state/state/protocol_state.go index 9316942f4d3..2e78f3c8e88 100644 --- a/state/protocol/protocol_state/state/protocol_state.go +++ b/state/protocol/protocol_state/state/protocol_state.go @@ -205,16 +205,21 @@ func (s *MutableProtocolState) EvolveState( return flow.ZeroID, nil, fmt.Errorf("extracting service events from candidate seals failed: %w", err) } - parentStateID, stateMachines, evolvingState, err := s.initializeOrthogonalStateMachines(parentBlockID, candidateView) + parentStateID, stateMachines, evolvingState, dbMigrationOp, err := s.initializeOrthogonalStateMachines(parentBlockID, candidateView) if err != nil { return flow.ZeroID, nil, fmt.Errorf("failure initializing sub-state machines for evolving the Protocol State: %w", err) } + blockPersistDbUpdates := transaction.NewDeferredBlockPersist() + if dbMigrationOp != nil { + blockPersistDbUpdates.AddDbOp(dbMigrationOp) + } resultingStateID, dbUpdates, err := s.build(parentStateID, stateMachines, serviceEvents, evolvingState) if err != nil { return flow.ZeroID, nil, fmt.Errorf("evolving and building the resulting Protocol State failed: %w", err) } - return resultingStateID, dbUpdates, nil + blockPersistDbUpdates.AddIndexingOps(dbUpdates.Pending()) + return resultingStateID, blockPersistDbUpdates, nil } // initializeOrthogonalStateMachines instantiates the sub-state machines that in aggregate evolve the protocol state. @@ -232,13 +237,13 @@ func (s *MutableProtocolState) EvolveState( func (s *MutableProtocolState) initializeOrthogonalStateMachines( parentBlockID flow.Identifier, candidateView uint64, -) (flow.Identifier, []protocol_state.KeyValueStoreStateMachine, protocol_state.KVStoreMutator, error) { +) (flow.Identifier, []protocol_state.KeyValueStoreStateMachine, protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error) { parentState, err := s.kvStoreSnapshots.ByBlockID(parentBlockID) if err != nil { if errors.Is(err, storage.ErrNotFound) { - return flow.ZeroID, nil, nil, irrecoverable.NewExceptionf("Protocol State at parent block %v was not found: %w", parentBlockID, err) + return flow.ZeroID, nil, nil, nil, irrecoverable.NewExceptionf("Protocol State at parent block %v was not found: %w", parentBlockID, err) } - return flow.ZeroID, nil, nil, fmt.Errorf("unexpected exception while retrieving Protocol State at parent block %v: %w", parentBlockID, err) + return flow.ZeroID, nil, nil, nil, fmt.Errorf("unexpected exception while retrieving Protocol State at parent block %v: %w", parentBlockID, err) } protocolVersion := parentState.GetProtocolStateVersion() @@ -248,23 +253,33 @@ func (s *MutableProtocolState) initializeOrthogonalStateMachines( } } - evolvingState, err := parentState.Replicate(protocolVersion) + evolvingState, dbMigrationOp, err := parentState.Replicate(protocolVersion) if err != nil { if errors.Is(err, kvstore.ErrIncompatibleVersionChange) { - return flow.ZeroID, nil, nil, irrecoverable.NewExceptionf("replicating parent block's protocol state failed due to unsupported version: %w", err) + return flow.ZeroID, nil, nil, nil, irrecoverable.NewExceptionf("replicating parent block's protocol state failed due to unsupported version: %w", err) + } + return flow.ZeroID, nil, nil, nil, fmt.Errorf("could not replicate parent KV store (version=%d) to protocol version %d: %w", parentState.GetProtocolStateVersion(), protocolVersion, err) + } + if dbMigrationOp != nil { + dbMigrationOp = func(tx *transaction.Tx) error { + err := dbMigrationOp(tx) + if err != nil { + return fmt.Errorf("could not migrate storage from %d to %d: %w", + parentState.GetProtocolStateVersion(), protocolVersion, err) + } + return nil } - return flow.ZeroID, nil, nil, fmt.Errorf("could not replicate parent KV store (version=%d) to protocol version %d: %w", parentState.GetProtocolStateVersion(), protocolVersion, err) } stateMachines := make([]protocol_state.KeyValueStoreStateMachine, 0, len(s.kvStateMachineFactories)) for _, factory := range s.kvStateMachineFactories { stateMachine, err := factory.Create(candidateView, parentBlockID, parentState, evolvingState) if err != nil { - return flow.ZeroID, nil, nil, fmt.Errorf("could not create state machine: %w", err) + return flow.ZeroID, nil, nil, nil, fmt.Errorf("could not create state machine: %w", err) } stateMachines = append(stateMachines, stateMachine) } - return parentState.ID(), stateMachines, evolvingState, nil + return parentState.ID(), stateMachines, evolvingState, dbMigrationOp, nil } // serviceEventsFromSeals arranges the sealed results in order of increasing height of the executed blocks diff --git a/storage/badger/operation/dkg.go b/storage/badger/operation/dkg.go index 1d9334dfbe1..efc8f8f24aa 100644 --- a/storage/badger/operation/dkg.go +++ b/storage/badger/operation/dkg.go @@ -1,6 +1,7 @@ package operation import ( + "encoding/binary" "errors" "github.com/dgraph-io/badger/v2" @@ -74,3 +75,31 @@ func UpsertDKGStateForEpoch(epochCounter uint64, newState flow.DKGState) func(*b func RetrieveDKGStateForEpoch(epochCounter uint64, currentState *flow.DKGState) func(*badger.Txn) error { return retrieve(makePrefix(codeDKGState, epochCounter), currentState) } + +func RetrieveStoredEpochStates() func(*badger.Txn) ([]uint64, []uint32, error) { + return func(tx *badger.Txn) ([]uint64, []uint32, error) { + var epochs []uint64 + var states []uint32 + err := traverse(makePrefix(codeDKGState), func() (checkFunc, createFunc, handleFunc) { + var epochCounter uint64 + check := func(key []byte) bool { + epochCounter = binary.BigEndian.Uint64(key[1:]) // omit code + return true + } + var state uint32 + create := func() interface{} { + return &state + } + handle := func() error { + epochs = append(epochs, epochCounter) + states = append(states, state) + return nil + } + return check, create, handle + })(tx) + if err != nil { + return nil, nil, err + } + return epochs, states, nil + } +}