Skip to content

Commit

Permalink
Added database migration as part of model upgrade for dynamic protoco…
Browse files Browse the repository at this point in the history
…l state. Added implementation for DKG end state migration between v1 and v2
  • Loading branch information
durkmurder committed Jan 8, 2025
1 parent 7e6e79d commit 4f59be0
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 45 deletions.
2 changes: 1 addition & 1 deletion state/protocol/protocol_state/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type KVStoreAPI interface {
// Expected errors during normal operations:
// - kvstore.ErrIncompatibleVersionChange if replicating the Parent Snapshot into a Snapshot
// with the specified `protocolVersion` is not supported.
Replicate(protocolVersion uint64) (KVStoreMutator, error)
Replicate(protocolVersion uint64) (KVStoreMutator, transaction.DeferredDBUpdate, error)
}

// KVStoreMutator is the latest read-writer interface to the Protocol State key-value store.
Expand Down
59 changes: 46 additions & 13 deletions state/protocol/protocol_state/kvstore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kvstore

import (
"fmt"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/badger/transaction"

clone "github.com/huandu/go-clone/generic" //nolint:goimports

Expand Down Expand Up @@ -74,14 +76,14 @@ func (model *Modelv0) ID() flow.Identifier {
// Expected errors during normal operations:
// - ErrIncompatibleVersionChange if replicating the Parent Snapshot into a Snapshot
// with the specified `protocolVersion` is not supported.
func (model *Modelv0) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, error) {
func (model *Modelv0) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error) {
currentVersion := model.GetProtocolStateVersion()
if currentVersion == protocolVersion {
// no need for migration, return a complete copy
return clone.Clone(model), nil
return clone.Clone(model), nil, nil
}
if protocolVersion != 1 {
return nil, fmt.Errorf("unsupported replication version %d, expect %d: %w",
return nil, nil, fmt.Errorf("unsupported replication version %d, expect %d: %w",
protocolVersion, 1, ErrIncompatibleVersionChange)
}

Expand All @@ -90,9 +92,9 @@ func (model *Modelv0) Replicate(protocolVersion uint64) (protocol_state.KVStoreM
Modelv0: clone.Clone(*model),
}
if v1.GetProtocolStateVersion() != protocolVersion {
return nil, fmt.Errorf("sanity check: replicate resulted in unexpected version (%d != %d)", v1.GetProtocolStateVersion(), protocolVersion)
return nil, nil, fmt.Errorf("sanity check: replicate resulted in unexpected version (%d != %d)", v1.GetProtocolStateVersion(), protocolVersion)
}
return v1, nil
return v1, nil, nil
}

// VersionedEncode encodes the key-value store, returning the version separately
Expand Down Expand Up @@ -170,16 +172,16 @@ func (model *Modelv1) ID() flow.Identifier {
// Expected errors during normal operations:
// - ErrIncompatibleVersionChange if replicating the Parent Snapshot into a Snapshot
// with the specified `protocolVersion` is not supported.
func (model *Modelv1) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, error) {
func (model *Modelv1) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error) {
currentVersion := model.GetProtocolStateVersion()
if currentVersion == protocolVersion {
// no need for migration, return a complete copy
return clone.Clone(model), nil
return clone.Clone(model), nil, nil
}
nextVersion := currentVersion + 1
if protocolVersion != nextVersion {
// can only Replicate into model with numerically consecutive version
return nil, fmt.Errorf("unsupported replication version %d, expect %d: %w",
return nil, nil, fmt.Errorf("unsupported replication version %d, expect %d: %w",
protocolVersion, 1, ErrIncompatibleVersionChange)
}

Expand All @@ -188,9 +190,40 @@ func (model *Modelv1) Replicate(protocolVersion uint64) (protocol_state.KVStoreM
Modelv1: clone.Clone(*model),
}
if v2.GetProtocolStateVersion() != protocolVersion {
return nil, fmt.Errorf("sanity check: replicate resulted in unexpected version (%d != %d)", v2.GetProtocolStateVersion(), protocolVersion)
return nil, nil, fmt.Errorf("sanity check: replicate resulted in unexpected version (%d != %d)", v2.GetProtocolStateVersion(), protocolVersion)
}
return v2, nil

dbMigration := func(tx *transaction.Tx) error {
epochs, states, err := operation.RetrieveStoredEpochStates()(tx.DBTxn)
if err != nil {
return fmt.Errorf("could not retrieve stored epoch states: %w", err)
}

convertState := func(state uint32) flow.DKGState {
switch state {
case 0: // DKGEndStateUnknown
return flow.DKGStateUninitialized
case 1: // DKGEndStateSuccess
return flow.RandomBeaconKeyCommitted
case 2, 3, 4: // DKGEndStateInconsistentKey, DKGEndStateNoKey, DKGEndStateDKGFailure
return flow.DKGStateFailure
default:
return flow.DKGStateUninitialized
}
}

for i, epoch := range epochs {
from := states[i]
to := convertState(from)
err = operation.UpsertDKGStateForEpoch(epoch, to)(tx.DBTxn)
if err != nil {
return fmt.Errorf("could not upsert DKG state for epoch %d: %w", epoch, err)
}
}
return nil
}

return v2, dbMigration, nil
}

// VersionedEncode encodes the key-value store, returning the version separately
Expand Down Expand Up @@ -228,13 +261,13 @@ func (model *Modelv2) ID() flow.Identifier {
// Expected errors during normal operations:
// - ErrIncompatibleVersionChange if replicating the Parent Snapshot into a Snapshot
// with the specified `protocolVersion` is not supported.
func (model *Modelv2) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, error) {
func (model *Modelv2) Replicate(protocolVersion uint64) (protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error) {
currentVersion := model.GetProtocolStateVersion()
if currentVersion == protocolVersion {
// no need for migration, return a complete copy
return clone.Clone(model), nil
return clone.Clone(model), nil, nil
} else {
return nil, fmt.Errorf("unsupported replication version %d: %w",
return nil, nil, fmt.Errorf("unsupported replication version %d: %w",
protocolVersion, ErrIncompatibleVersionChange)
}
}
Expand Down
18 changes: 9 additions & 9 deletions state/protocol/protocol_state/kvstore/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) {
},
},
}
cpy, err := model.Replicate(model.GetProtocolStateVersion())
cpy, _, err := model.Replicate(model.GetProtocolStateVersion())
require.NoError(t, err)
require.True(t, reflect.DeepEqual(model, cpy)) // expect the same model
require.Equal(t, cpy.ID(), model.ID())
Expand All @@ -132,7 +132,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) {
},
},
}
newVersion, err := model.Replicate(1)
newVersion, _, err := model.Replicate(1)
require.NoError(t, err)
require.Equal(t, uint64(1), newVersion.GetProtocolStateVersion())
require.NotEqual(t, newVersion.ID(), model.ID(), "two models with the same data but different version must have different ID")
Expand All @@ -142,7 +142,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) {
})
t.Run("v0-invalid-upgrade", func(t *testing.T) {
model := &kvstore.Modelv0{}
newVersion, err := model.Replicate(model.GetProtocolStateVersion() + 10)
newVersion, _, err := model.Replicate(model.GetProtocolStateVersion() + 10)
require.ErrorIs(t, err, kvstore.ErrIncompatibleVersionChange)
require.Nil(t, newVersion)
})
Expand All @@ -158,7 +158,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) {
EpochStateID: unittest.IdentifierFixture(),
},
}
cpy, err := model.Replicate(model.GetProtocolStateVersion())
cpy, _, err := model.Replicate(model.GetProtocolStateVersion())
require.NoError(t, err)
require.True(t, reflect.DeepEqual(model, cpy))

Expand All @@ -171,7 +171,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) {
model.GetProtocolStateVersion() - 1,
model.GetProtocolStateVersion() + 10,
} {
newVersion, err := model.Replicate(version)
newVersion, _, err := model.Replicate(version)
require.ErrorIs(t, err, kvstore.ErrIncompatibleVersionChange)
require.Nil(t, newVersion)
}
Expand All @@ -187,7 +187,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) {
},
},
}
newVersion, err := model.Replicate(2)
newVersion, _, err := model.Replicate(2)
require.NoError(t, err)
require.Equal(t, uint64(2), newVersion.GetProtocolStateVersion())
require.NotEqual(t, newVersion.ID(), model.ID(), "two models with the same data but different version must have different ID")
Expand All @@ -203,7 +203,7 @@ func TestKVStoreAPI_Replicate(t *testing.T) {
model.GetProtocolStateVersion() + 1,
model.GetProtocolStateVersion() + 10,
} {
newVersion, err := model.Replicate(version)
newVersion, _, err := model.Replicate(version)
require.ErrorIs(t, err, kvstore.ErrIncompatibleVersionChange)
require.Nil(t, newVersion)
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestKVStoreMutator_SetEpochExtensionViewCount(t *testing.T) {
t.Run("happy-path", func(t *testing.T) {
store, err := kvstore.NewDefaultKVStore(safetyParams.FinalizationSafetyThreshold, safetyParams.EpochExtensionViewCount, epochStateID)
require.NoError(t, err)
mutator, err := store.Replicate(store.GetProtocolStateVersion())
mutator, _, err := store.Replicate(store.GetProtocolStateVersion())
require.NoError(t, err)

newValue := safetyParams.FinalizationSafetyThreshold*2 + 1
Expand All @@ -280,7 +280,7 @@ func TestKVStoreMutator_SetEpochExtensionViewCount(t *testing.T) {
t.Run("invalid-value", func(t *testing.T) {
store, err := kvstore.NewDefaultKVStore(safetyParams.FinalizationSafetyThreshold, safetyParams.EpochExtensionViewCount, epochStateID)
require.NoError(t, err)
mutator, err := store.Replicate(store.GetProtocolStateVersion())
mutator, _, err := store.Replicate(store.GetProtocolStateVersion())
require.NoError(t, err)

oldValue := mutator.GetEpochExtensionViewCount()
Expand Down
23 changes: 17 additions & 6 deletions state/protocol/protocol_state/mock/kv_store_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *StateMutatorSuite) SetupTest() {
s.parentState.On("GetProtocolStateVersion").Return(s.latestProtocolVersion)
s.parentState.On("GetVersionUpgrade").Return(nil) // no version upgrade by default
s.parentState.On("ID").Return(unittest.IdentifierFixture(), nil)
s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil)
s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil, nil)

// state replicated from the parent state; by default exactly the same as the parent state
// CAUTION: ID of evolving state must be defined by the tests.
Expand Down Expand Up @@ -274,7 +274,7 @@ func (s *StateMutatorSuite) Test_VersionUpgrade() {
Data: newVersion,
ActivationView: s.candidate.View + 1,
}).Once()
s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil)
s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil, nil)
s.evolvingState.On("ID").Return(parentStateID, nil).Once()

for i := range s.kvStateMachines {
Expand All @@ -295,7 +295,7 @@ func (s *StateMutatorSuite) Test_VersionUpgrade() {
Data: newVersion,
ActivationView: s.candidate.View,
}).Once()
s.parentState.On("Replicate", newVersion).Return(&s.evolvingState, nil)
s.parentState.On("Replicate", newVersion).Return(&s.evolvingState, nil, nil)
s.evolvingState.On("ID").Return(newStateID, nil).Once()

for i := range s.kvStateMachines {
Expand All @@ -318,7 +318,7 @@ func (s *StateMutatorSuite) Test_VersionUpgrade() {
Data: newVersion,
ActivationView: s.candidate.View - 1,
}).Once()
s.parentState.On("Replicate", newVersion).Return(&s.evolvingState, nil)
s.parentState.On("Replicate", newVersion).Return(&s.evolvingState, nil, nil)
s.evolvingState.On("ID").Return(newStateID, nil).Once()

for i := range s.kvStateMachines {
Expand All @@ -339,7 +339,7 @@ func (s *StateMutatorSuite) Test_VersionUpgrade() {
Data: s.latestProtocolVersion,
ActivationView: s.candidate.View - 1,
}).Once()
s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil)
s.parentState.On("Replicate", s.latestProtocolVersion).Return(&s.evolvingState, nil, nil)
s.evolvingState.On("ID").Return(parentStateID, nil).Once()

for i := range s.kvStateMachines {
Expand Down Expand Up @@ -414,7 +414,7 @@ func (s *StateMutatorSuite) Test_ReplicateFails() {
s.parentState = *protocol_statemock.NewKVStoreAPI(s.T())
s.parentState.On("GetProtocolStateVersion").Return(s.latestProtocolVersion)
s.parentState.On("GetVersionUpgrade").Return(nil).Once()
s.parentState.On("Replicate", s.latestProtocolVersion).Return(nil, exception).Once()
s.parentState.On("Replicate", s.latestProtocolVersion).Return(nil, nil, exception).Once()

// `SetupTest` initializes the mock factories to expect to be called, so we overwrite the mocks here:
s.kvStateMachineFactories[0] = *protocol_statemock.NewKeyValueStoreStateMachineFactory(s.T())
Expand Down
35 changes: 25 additions & 10 deletions state/protocol/protocol_state/state/protocol_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,21 @@ func (s *MutableProtocolState) EvolveState(
return flow.ZeroID, nil, fmt.Errorf("extracting service events from candidate seals failed: %w", err)
}

parentStateID, stateMachines, evolvingState, err := s.initializeOrthogonalStateMachines(parentBlockID, candidateView)
parentStateID, stateMachines, evolvingState, dbMigrationOp, err := s.initializeOrthogonalStateMachines(parentBlockID, candidateView)
if err != nil {
return flow.ZeroID, nil, fmt.Errorf("failure initializing sub-state machines for evolving the Protocol State: %w", err)
}
blockPersistDbUpdates := transaction.NewDeferredBlockPersist()
if dbMigrationOp != nil {
blockPersistDbUpdates.AddDbOp(dbMigrationOp)
}

resultingStateID, dbUpdates, err := s.build(parentStateID, stateMachines, serviceEvents, evolvingState)
if err != nil {
return flow.ZeroID, nil, fmt.Errorf("evolving and building the resulting Protocol State failed: %w", err)
}
return resultingStateID, dbUpdates, nil
blockPersistDbUpdates.AddIndexingOps(dbUpdates.Pending())
return resultingStateID, blockPersistDbUpdates, nil
}

// initializeOrthogonalStateMachines instantiates the sub-state machines that in aggregate evolve the protocol state.
Expand All @@ -232,13 +237,13 @@ func (s *MutableProtocolState) EvolveState(
func (s *MutableProtocolState) initializeOrthogonalStateMachines(
parentBlockID flow.Identifier,
candidateView uint64,
) (flow.Identifier, []protocol_state.KeyValueStoreStateMachine, protocol_state.KVStoreMutator, error) {
) (flow.Identifier, []protocol_state.KeyValueStoreStateMachine, protocol_state.KVStoreMutator, transaction.DeferredDBUpdate, error) {
parentState, err := s.kvStoreSnapshots.ByBlockID(parentBlockID)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return flow.ZeroID, nil, nil, irrecoverable.NewExceptionf("Protocol State at parent block %v was not found: %w", parentBlockID, err)
return flow.ZeroID, nil, nil, nil, irrecoverable.NewExceptionf("Protocol State at parent block %v was not found: %w", parentBlockID, err)
}
return flow.ZeroID, nil, nil, fmt.Errorf("unexpected exception while retrieving Protocol State at parent block %v: %w", parentBlockID, err)
return flow.ZeroID, nil, nil, nil, fmt.Errorf("unexpected exception while retrieving Protocol State at parent block %v: %w", parentBlockID, err)
}

protocolVersion := parentState.GetProtocolStateVersion()
Expand All @@ -248,23 +253,33 @@ func (s *MutableProtocolState) initializeOrthogonalStateMachines(
}
}

evolvingState, err := parentState.Replicate(protocolVersion)
evolvingState, dbMigrationOp, err := parentState.Replicate(protocolVersion)
if err != nil {
if errors.Is(err, kvstore.ErrIncompatibleVersionChange) {
return flow.ZeroID, nil, nil, irrecoverable.NewExceptionf("replicating parent block's protocol state failed due to unsupported version: %w", err)
return flow.ZeroID, nil, nil, nil, irrecoverable.NewExceptionf("replicating parent block's protocol state failed due to unsupported version: %w", err)
}
return flow.ZeroID, nil, nil, nil, fmt.Errorf("could not replicate parent KV store (version=%d) to protocol version %d: %w", parentState.GetProtocolStateVersion(), protocolVersion, err)
}
if dbMigrationOp != nil {
dbMigrationOp = func(tx *transaction.Tx) error {
err := dbMigrationOp(tx)
if err != nil {
return fmt.Errorf("could not migrate storage from %d to %d: %w",
parentState.GetProtocolStateVersion(), protocolVersion, err)
}
return nil
}
return flow.ZeroID, nil, nil, fmt.Errorf("could not replicate parent KV store (version=%d) to protocol version %d: %w", parentState.GetProtocolStateVersion(), protocolVersion, err)
}

stateMachines := make([]protocol_state.KeyValueStoreStateMachine, 0, len(s.kvStateMachineFactories))
for _, factory := range s.kvStateMachineFactories {
stateMachine, err := factory.Create(candidateView, parentBlockID, parentState, evolvingState)
if err != nil {
return flow.ZeroID, nil, nil, fmt.Errorf("could not create state machine: %w", err)
return flow.ZeroID, nil, nil, nil, fmt.Errorf("could not create state machine: %w", err)
}
stateMachines = append(stateMachines, stateMachine)
}
return parentState.ID(), stateMachines, evolvingState, nil
return parentState.ID(), stateMachines, evolvingState, dbMigrationOp, nil
}

// serviceEventsFromSeals arranges the sealed results in order of increasing height of the executed blocks
Expand Down
Loading

0 comments on commit 4f59be0

Please sign in to comment.