Skip to content

Commit

Permalink
Self-healing (#2400)
Browse files Browse the repository at this point in the history
## Motivation
See spacemeshos/SMIPS#46

Requires #2394, #2393, #2357
Closes #2203
Closes #2687

## Changes
- adds zdist param: whereas hdist is the "Hare lookback" distance (the number of layers for which we consider hare results/local input vector rather than global opinion), zdist is the "Hare result wait" distance (the number of layers we're willing to wait for Hare to finish before reverting to invalidating layers with no input vector)
- Hare explicitly reports failed CPs to the mesh, with the results stored in memory

## Test Plan
TBD

## TODO
<!-- This section should be removed when all items are complete -->
- [x] Explain motivation or link existing issue(s)
- [x] Test changes and document test plan
- [x] Finish rescoring block goodness after healing
- [x] Rescoring block unit test
- [x] Block voting weight unit test
- [x] Make sure healing -> verifying tortoise handoff is working
- [x] App test (for decay in active set size)
- [x] Split-then-rejoin test (two way split, three way split w/o majority fork)
- [x] Late blocks unit test(s)
- [x] Correctly weight votes from blocks with late atxs/those with a bad beacon value (will follow up in separate issue: #2540)
- [x] Multi tortoise unit tests

## DevOps Notes
<!-- Please uncheck these items as applicable to make DevOps aware of changes that may affect releases -->
- [x] This PR does not require configuration changes (e.g., environment variables, GitHub secrets, VM resources)
- [x] This PR does not affect public APIs
- [x] This PR does not rely on a new version of external services (PoET, elasticsearch, etc.)
- [ ] This PR does not make changes to log messages (which monitoring infrastructure may rely on)
  • Loading branch information
lrettig committed Aug 26, 2021
1 parent 389ab13 commit 00ca8ca
Show file tree
Hide file tree
Showing 85 changed files with 5,355 additions and 1,598 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v1
with:
go-version: '1.14.13'
go-version: '1.15.13'

- if: matrix.os == 'windows-latest'
name: Install make in windows
Expand Down
2 changes: 1 addition & 1 deletion .run/Template Go Test.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
<option name="RunConfigurationTask" enabled="true" run_configuration_name="go-env-test" run_configuration_type="MAKEFILE_TARGET_RUN_CONFIGURATION" />
</method>
</configuration>
</component>
</component>
8 changes: 4 additions & 4 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ func (b *Builder) StartSmeshing(ctx context.Context, coinbase types.Address, opt
doneChan, err := b.postSetupProvider.StartSession(opts)
if err != nil {
b.status = smeshingStatusIdle
return fmt.Errorf("failed to start Post setup session: %v", err)
return fmt.Errorf("failed to start post setup session: %w", err)
}

go func() {
<-doneChan
if s := b.postSetupProvider.Status(); s.State != postSetupStateComplete {
b.status = smeshingStatusIdle
b.log.Error("failed to complete Post setup: %v", b.postSetupProvider.LastError())
b.log.With().Error("failed to complete post setup", log.Err(b.postSetupProvider.LastError()))
return
}

Expand Down Expand Up @@ -300,7 +300,7 @@ func (b *Builder) loop(ctx context.Context) {
// TODO(moshababo): don't generate the commitment every time smeshing is starting, but once only.
b.initialPost, _, err = b.postSetupProvider.GenerateProof(shared.ZeroChallenge)
if err != nil {
b.log.Error("Post execution failed: %v", err)
b.log.Error("post execution failed: %v", err)
b.status = smeshingStatusIdle
return
}
Expand Down Expand Up @@ -542,7 +542,7 @@ func (b *Builder) createAtx(ctx context.Context) (*types.ActivationTx, error) {
log.FieldNamed("current_layer", b.layerClock.GetCurrentLayer()),
)
if err := b.waitOrStop(ctx, b.layerClock.AwaitLayer(pubEpoch.FirstLayer())); err != nil {
return nil, fmt.Errorf("failed to wait of publication epoch: %w", err)
return nil, fmt.Errorf("failed to wait for publication epoch: %w", err)
}
b.log.Info("publication epoch has arrived!")
if discarded := b.discardChallengeIfStale(); discarded {
Expand Down
92 changes: 39 additions & 53 deletions activation/activationdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/spacemeshos/ed25519"
Expand Down Expand Up @@ -52,19 +51,20 @@ func createLayerWithAtx2(t require.TestingT, msh *mesh.Mesh, id types.LayerID, n

type MeshValidatorMock struct{}

func (m *MeshValidatorMock) Persist() error {
func (m *MeshValidatorMock) Persist(context.Context) error {
return nil
}

func (m *MeshValidatorMock) LatestComplete() types.LayerID {
panic("implement me")
}

func (m *MeshValidatorMock) HandleIncomingLayer(layer *types.Layer) (types.LayerID, types.LayerID) {
return layer.Index().Sub(1), layer.Index()
func (m *MeshValidatorMock) HandleIncomingLayer(_ context.Context, layerID types.LayerID) (types.LayerID, types.LayerID, bool) {
return layerID.Sub(1), layerID, false
}
func (m *MeshValidatorMock) HandleLateBlock(bl *types.Block) (types.LayerID, types.LayerID) {
return bl.Layer().Sub(1), bl.Layer()

func (m *MeshValidatorMock) HandleLateBlocks(_ context.Context, blocks []*types.Block) (types.LayerID, types.LayerID) {
return blocks[0].Layer().Sub(1), blocks[0].Layer()
}

type MockState struct{}
Expand All @@ -73,7 +73,7 @@ func (MockState) GetAllAccounts() (*types.MultipleAccountsState, error) {
panic("implement me")
}

func (MockState) ValidateAndAddTxToPool(tx *types.Transaction) error {
func (MockState) ValidateAndAddTxToPool(*types.Transaction) error {
panic("implement me")
}

Expand Down Expand Up @@ -104,32 +104,17 @@ func (MockState) AddressExists(types.Address) bool {
return true
}

func (MockState) GetLayerStateRoot(layer types.LayerID) (types.Hash32, error) {
func (MockState) GetLayerStateRoot(types.LayerID) (types.Hash32, error) {
panic("implement me")
}

func (MockState) GetBalance(addr types.Address) uint64 {
func (MockState) GetBalance(types.Address) uint64 {
panic("implement me")
}
func (MockState) GetNonce(addr types.Address) uint64 {
func (MockState) GetNonce(types.Address) uint64 {
panic("implement me")
}

type ATXDBMock struct {
mock.Mock
counter int
workSymLock sync.Mutex
activeSet uint32
}

func (mock *ATXDBMock) CalcMinerWeights(types.EpochID, map[types.BlockID]struct{}) (map[string]uint64, error) {
mock.workSymLock.Lock()
defer mock.workSymLock.Unlock()

mock.counter++
return map[string]uint64{"aaaaac": 1, "aaabddb": 2, "aaaccc": 3}, nil
}

type MockTxMemPool struct{}

func (MockTxMemPool) Get(types.TransactionID) (*types.Transaction, error) {
Expand All @@ -139,15 +124,6 @@ func (MockTxMemPool) Get(types.TransactionID) (*types.Transaction, error) {
func (MockTxMemPool) Put(types.TransactionID, *types.Transaction) {}
func (MockTxMemPool) Invalidate(types.TransactionID) {}

type MockAtxMemPool struct{}

func (MockAtxMemPool) Get(types.ATXID) (*types.ActivationTx, error) {
return &types.ActivationTx{}, nil
}

func (MockAtxMemPool) Put(*types.ActivationTx) {}
func (MockAtxMemPool) Invalidate(types.ATXID) {}

func ConfigTst() mesh.Config {
return mesh.Config{
BaseReward: big.NewInt(5000),
Expand All @@ -171,14 +147,24 @@ func rndStr() string {
return string(a)
}

func createLayerWithAtx(t *testing.T, msh *mesh.Mesh, id types.LayerID, numOfBlocks int, atxs []*types.ActivationTx, votes []types.BlockID, views []types.BlockID) (created []types.BlockID) {
func processAtxs(db *DB, atxs []*types.ActivationTx) error {
for _, atx := range atxs {
err := db.ProcessAtx(atx)
if err != nil {
return err
}
}
return nil
}

func createLayerWithAtx(t *testing.T, msh *mesh.Mesh, atxdb *DB, id types.LayerID, numOfBlocks int, atxs []*types.ActivationTx, votes []types.BlockID, views []types.BlockID) (created []types.BlockID) {
if numOfBlocks < len(atxs) {
panic("not supported")
}
for i := 0; i < numOfBlocks; i++ {
block1 := types.NewExistingBlock(id, []byte(rand.String(8)), nil)
block1.ForDiff = append(block1.ForDiff, votes...)
activeSet := []types.ATXID{}
var activeSet []types.ATXID
if i < len(atxs) {
activeSet = append(activeSet, atxs[i].ID())
}
Expand All @@ -196,7 +182,7 @@ func createLayerWithAtx(t *testing.T, msh *mesh.Mesh, id types.LayerID, numOfBlo
if i < len(atxs) {
actualAtxs = atxs[i : i+1]
}
msh.ProcessAtxs(actualAtxs)
require.NoError(t, processAtxs(atxdb, actualAtxs))
block1.Initialize()
err := msh.AddBlockWithTxs(context.TODO(), block1)
require.NoError(t, err)
Expand Down Expand Up @@ -231,14 +217,14 @@ func TestATX_ActiveSetForLayerView(t *testing.T) {
assert.NoError(t, err)
atx.NIPost = NewNIPostWithChallenge(hash, poetRef)
}
blocks := createLayerWithAtx(t, layers, types.NewLayerID(1), 6, atxs, []types.BlockID{}, []types.BlockID{})
blocks := createLayerWithAtx(t, layers, atxdb, types.NewLayerID(1), 6, atxs, []types.BlockID{}, []types.BlockID{})
before := blocks[:2]
two := blocks[2:3]
after := blocks[3:]
for i := uint32(2); i <= 10; i++ {
before = createLayerWithAtx(t, layers, types.NewLayerID(i), 1, []*types.ActivationTx{}, before, before)
two = createLayerWithAtx(t, layers, types.NewLayerID(i), 1, []*types.ActivationTx{}, two, two)
after = createLayerWithAtx(t, layers, types.NewLayerID(i), 1, []*types.ActivationTx{}, after, after)
before = createLayerWithAtx(t, layers, atxdb, types.NewLayerID(i), 1, []*types.ActivationTx{}, before, before)
two = createLayerWithAtx(t, layers, atxdb, types.NewLayerID(i), 1, []*types.ActivationTx{}, two, two)
after = createLayerWithAtx(t, layers, atxdb, types.NewLayerID(i), 1, []*types.ActivationTx{}, after, after)
}
for _, x := range before {
blocksMap[x] = struct{}{}
Expand Down Expand Up @@ -378,7 +364,7 @@ func TestMesh_processBlockATXs(t *testing.T) {
atx.NIPost = NewNIPostWithChallenge(hash, poetRef)
}

err = atxdb.ProcessAtxs(atxs)
err = processAtxs(atxdb, atxs)
assert.NoError(t, err)

// check that further atxs dont affect current epoch count
Expand All @@ -392,7 +378,7 @@ func TestMesh_processBlockATXs(t *testing.T) {
assert.NoError(t, err)
atx.NIPost = NewNIPostWithChallenge(hash, poetRef)
}
err = atxdb.ProcessAtxs(atxs2)
err = processAtxs(atxdb, atxs2)
assert.NoError(t, err)

assertEpochWeight(t, atxdb, 2, 100*100*4) // 1 posATX + 3 from `atxs`
Expand Down Expand Up @@ -430,9 +416,9 @@ func TestActivationDB_ValidateAtx(t *testing.T) {
atx.NIPost = NewNIPostWithChallenge(hash, poetRef)
}

blocks := createLayerWithAtx(t, layers, types.NewLayerID(1), 10, atxs, []types.BlockID{}, []types.BlockID{})
blocks = createLayerWithAtx(t, layers, types.NewLayerID(10), 10, []*types.ActivationTx{}, blocks, blocks)
blocks = createLayerWithAtx(t, layers, types.NewLayerID(100), 10, []*types.ActivationTx{}, blocks, blocks)
blocks := createLayerWithAtx(t, layers, atxdb, types.NewLayerID(1), 10, atxs, []types.BlockID{}, []types.BlockID{})
blocks = createLayerWithAtx(t, layers, atxdb, types.NewLayerID(10), 10, []*types.ActivationTx{}, blocks, blocks)
blocks = createLayerWithAtx(t, layers, atxdb, types.NewLayerID(100), 10, []*types.ActivationTx{}, blocks, blocks)

prevAtx := newActivationTx(idx1, 0, *types.EmptyATXID, *types.EmptyATXID, types.NewLayerID(100), 0, 100, coinbase1, 100, &types.NIPost{})
hash, err := prevAtx.NIPostChallenge.Hash()
Expand Down Expand Up @@ -475,9 +461,9 @@ func TestActivationDB_ValidateAtxErrors(t *testing.T) {
newActivationTx(id3, 0, *types.EmptyATXID, *types.EmptyATXID, types.NewLayerID(1), 0, 100, coinbase, 100, &types.NIPost{}),
}

blocks := createLayerWithAtx(t, layers, types.NewLayerID(1), 10, atxs, []types.BlockID{}, []types.BlockID{})
blocks = createLayerWithAtx(t, layers, types.NewLayerID(10), 10, []*types.ActivationTx{}, blocks, blocks)
blocks = createLayerWithAtx(t, layers, types.NewLayerID(100), 10, []*types.ActivationTx{}, blocks, blocks)
blocks := createLayerWithAtx(t, layers, atxdb, types.NewLayerID(1), 10, atxs, []types.BlockID{}, []types.BlockID{})
blocks = createLayerWithAtx(t, layers, atxdb, types.NewLayerID(10), 10, []*types.ActivationTx{}, blocks, blocks)
blocks = createLayerWithAtx(t, layers, atxdb, types.NewLayerID(100), 10, []*types.ActivationTx{}, blocks, blocks)

chlng := types.HexToHash32("0x3333")
poetRef := []byte{0xba, 0xbe}
Expand Down Expand Up @@ -651,9 +637,9 @@ func TestActivationDB_ValidateAndInsertSorted(t *testing.T) {
newActivationTx(id3, 0, *types.EmptyATXID, *types.EmptyATXID, types.NewLayerID(1), 0, 100, coinbase, 100, &types.NIPost{}),
}

blocks := createLayerWithAtx(t, layers, types.NewLayerID(1), 10, atxs, []types.BlockID{}, []types.BlockID{})
blocks = createLayerWithAtx(t, layers, types.NewLayerID(10), 10, []*types.ActivationTx{}, blocks, blocks)
blocks = createLayerWithAtx(t, layers, types.NewLayerID(100), 10, []*types.ActivationTx{}, blocks, blocks)
blocks := createLayerWithAtx(t, layers, atxdb, types.NewLayerID(1), 10, atxs, []types.BlockID{}, []types.BlockID{})
blocks = createLayerWithAtx(t, layers, atxdb, types.NewLayerID(10), 10, []*types.ActivationTx{}, blocks, blocks)
blocks = createLayerWithAtx(t, layers, atxdb, types.NewLayerID(100), 10, []*types.ActivationTx{}, blocks, blocks)

chlng := types.HexToHash32("0x3333")
poetRef := []byte{0x56, 0xbe}
Expand Down Expand Up @@ -833,7 +819,7 @@ func BenchmarkNewActivationDb(b *testing.B) {
pPrevAtxs := make([]types.ATXID, numOfMiners)
posAtx := prevAtxID
var atx *types.ActivationTx
layer := types.LayerID(postGenesisEpochLayer)
layer := postGenesisEpochLayer

start := time.Now()
eStart := time.Now()
Expand Down
32 changes: 6 additions & 26 deletions activation/atxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,26 +160,6 @@ func (db *DB) UnsubscribeAtx(id types.ATXID) {
}
}

// ProcessAtxs processes the list of given atxs using ProcessAtx method
func (db *DB) ProcessAtxs(atxs []*types.ActivationTx) error {
// seenMinerIds := map[string]struct{}{}
for _, atx := range atxs {
/* minerID := atx.NodeID.Key
if _, found := seenMinerIds[minerID]; found {
// TODO: Blacklist this miner
// TODO: Ensure that these are two different, syntactically valid ATXs for the same epoch, otherwise the
// miner did nothing wrong
db.log.With().Error("found miner with multiple ATXs published in same block",
log.FieldNamed("atx_node_id", atx.NodeID), atx.ID())
}*/
err := db.ProcessAtx(atx)
if err != nil {
return err
}
}
return nil
}

// ProcessAtx validates the active set size declared in the atx, and contextually validates the atx according to atx
// validation rules it then stores the atx with flag set to validity of the atx.
//
Expand Down Expand Up @@ -386,11 +366,11 @@ func (db *DB) SyntacticallyValidateAtx(atx *types.ActivationTx) error {
return fmt.Errorf("failed to compute NIPost's expected challenge hash: %v", err)
}

db.log.With().Info("Validating NIPost", log.String("expected_challenge_hash", expectedChallengeHash.String()), atx.ID())
db.log.With().Info("validating nipost", log.String("expected_challenge_hash", expectedChallengeHash.String()), atx.ID())

pubKey := signing.NewPublicKey(util.Hex2Bytes(atx.NodeID.Key))
if err = db.nipostValidator.Validate(*pubKey, atx.NIPost, *expectedChallengeHash, atx.NumUnits); err != nil {
return fmt.Errorf("invalid NIPost: %v", err)
return fmt.Errorf("invalid nipost: %v", err)
}

return nil
Expand Down Expand Up @@ -766,7 +746,7 @@ func (db *DB) HandleAtxData(ctx context.Context, data []byte, fetcher service.Fe
}

if err := db.FetchAtxReferences(ctx, atx, fetcher); err != nil {
return fmt.Errorf("received ATX with missing references of prev or pos id %v, %v, %v, %v",
return fmt.Errorf("received atx with missing references of prev or pos id %v, %v, %v, %v",
atx.ID().ShortString(), atx.PrevATXID.ShortString(), atx.PositioningATX.ShortString(), log.Err(err))
}

Expand All @@ -790,19 +770,19 @@ func (db *DB) HandleAtxData(ctx context.Context, data []byte, fetcher service.Fe
func (db *DB) FetchAtxReferences(ctx context.Context, atx *types.ActivationTx, f service.Fetcher) error {
logger := db.log.WithContext(ctx)
if atx.PositioningATX != *types.EmptyATXID && atx.PositioningATX != db.goldenATXID {
logger.With().Info("going to fetch pos atx", atx.PositioningATX, atx.ID())
logger.With().Debug("going to fetch pos atx", atx.PositioningATX, atx.ID())
if err := f.FetchAtx(ctx, atx.PositioningATX); err != nil {
return err
}
}

if atx.PrevATXID != *types.EmptyATXID {
logger.With().Info("going to fetch prev atx", atx.PrevATXID, atx.ID())
logger.With().Debug("going to fetch prev atx", atx.PrevATXID, atx.ID())
if err := f.FetchAtx(ctx, atx.PrevATXID); err != nil {
return err
}
}
logger.With().Info("done fetching references for atx", atx.ID())
logger.With().Debug("done fetching references for atx", atx.ID())

return nil
}
Loading

0 comments on commit 00ca8ca

Please sign in to comment.