Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added mining for POS #3187

Merged
merged 20 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)

miningSync := stagedsync.New(
stagedsync.MiningStages(ctx,
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, tmpdir),
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, tmpdir),
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, tmpdir),
stagedsync.StageHashStateCfg(db, tmpdir),
stagedsync.StageTrieCfg(db, false, true, tmpdir, getBlockReader(chainConfig)),
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
miner.MiningConfig.ExtraData = nextBlock.Extra()
miningStages.MockExecFunc(stages.MiningCreateBlock, func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx) error {
err = stagedsync.SpawnMiningCreateBlockStage(s, tx,
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, tmpDir),
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, tmpDir),
quit)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/rpcdaemontest/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g
ethashApi := apis[1].Service.(*ethash.API)
server := grpc.NewServer()

remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil))
remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, nil, false))
txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer)
txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi))
listener := bufconn.Listen(1024 * 1024)
Expand Down
10 changes: 7 additions & 3 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ var (
Name: "mine",
Usage: "Enable mining",
}
ProposingEnabledFlag = cli.BoolFlag{
Name: "proposer",
Usage: "Enable PoS proposer",
}
MinerNotifyFlag = cli.StringFlag{
Name: "miner.notify",
Usage: "Comma separated HTTP URL list to notify of new work packages",
Expand Down Expand Up @@ -1150,9 +1154,9 @@ func setParlia(ctx *cli.Context, cfg *params.ParliaConfig, datadir string) {
}

func setMiner(ctx *cli.Context, cfg *params.MiningConfig) {
if ctx.GlobalIsSet(MiningEnabledFlag.Name) {
cfg.Enabled = true
}
cfg.Enabled = ctx.GlobalIsSet(MiningEnabledFlag.Name)
cfg.EnabledPOS = ctx.GlobalIsSet(ProposingEnabledFlag.Name)

if cfg.Enabled && len(cfg.Etherbase.Bytes()) == 0 {
panic(fmt.Sprintf("Erigon supports only remote miners. Flag --%s or --%s is required", MinerNotifyFlag.Name, MinerSigningKeyFileFlag.Name))
}
Expand Down
58 changes: 54 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/ledgerwatch/erigon/eth/ethutils"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/node"
Expand Down Expand Up @@ -345,7 +346,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.pendingBlocks = miner.PendingResultCh
backend.minedBlocks = miner.MiningResultCh
backend.reverseDownloadCh = make(chan privateapi.PayloadMessage)
backend.statusCh = make(chan privateapi.ExecutionStatus)
backend.statusCh = make(chan privateapi.ExecutionStatus, 1)

var blockReader interfaces.FullBlockReader
if config.Snapshot.Enabled {
Expand Down Expand Up @@ -390,10 +391,10 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
} else {
blockReader = snapshotsync.NewBlockReader()
}

// proof-of-work mining
mining := stagedsync.New(
stagedsync.MiningStages(backend.sentryCtx,
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, tmpdir),
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, nil, tmpdir),
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir),
stagedsync.StageHashStateCfg(backend.chainDB, tmpdir),
stagedsync.StageTrieCfg(backend.chainDB, false, true, tmpdir, blockReader),
Expand All @@ -404,10 +405,38 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
if casted, ok := backend.engine.(*ethash.Ethash); ok {
ethashApi = casted.APIs(nil)[1].Service.(*ethash.API)
}
// proof-of-stake mining
assembleBlockPOS := func(random common.Hash, suggestedFeeRecipient common.Address, timestamp uint64) (*types.Block, error) {
miningStatePos := stagedsync.NewMiningState(&config.Miner)
proposingSync := stagedsync.New(
stagedsync.MiningStages(backend.sentryCtx,
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miningStatePos, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, &stagedsync.BlockProposerParametersPOS{
Random: random,
SuggestedFeeRecipient: suggestedFeeRecipient,
Timestamp: timestamp,
}, tmpdir),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir),
stagedsync.StageHashStateCfg(backend.chainDB, tmpdir),
stagedsync.StageTrieCfg(backend.chainDB, false, true, tmpdir, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit),
), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder)
// We start the mining step
if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync); err != nil {
return nil, err
}
block := <-miningStatePos.MiningResultPOSCh
return block, nil
}
atomic.StoreUint32(&backend.waitingForBeaconChain, 0)
// Initialize ethbackend
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events,
blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain)
blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain,
backend.sentryControlServer.Hd.SkipCycleHack, assembleBlockPOS, config.Miner.EnabledPOS)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi)
// If we enabled the proposer flag we initiates the block proposing thread
if config.Miner.EnabledPOS {
ethBackendRPC.StartProposer()
}
if stack.Config().PrivateApiAddr != "" {
var creds credentials.TransportCredentials
if stack.Config().TLSConnection {
Expand Down Expand Up @@ -616,6 +645,12 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
var hasWork bool
errc := make(chan error, 1)

tx, err := s.chainDB.BeginRo(ctx)
if err != nil {
log.Warn("mining", "err", err)
return
}

for {
mineEvery.Reset(3 * time.Second)
select {
Expand All @@ -635,6 +670,21 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
case <-quitCh:
return
}
// Check if we transitioned and if we did halt POW mining
headNumber, err := stages.GetStageProgress(tx, stages.Headers)
if err != nil {
log.Warn("mining", "err", err)
return
}

isTrans, err := rawdb.Transitioned(tx, headNumber, s.chainConfig.TerminalTotalDifficulty)
if err != nil {
log.Warn("mining", "err", err)
return
}
if isTrans {
return
}

if !works && hasWork {
works = true
Expand Down
14 changes: 11 additions & 3 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,16 @@ func HeadersPOS(
) error {
// Waiting for the beacon chain
log.Info("Waiting for payloads...")
var payloadMessage privateapi.PayloadMessage
atomic.StoreUint32(cfg.waitingPosHeaders, 1)
payloadMessage := <-cfg.reverseDownloadCh
// Decide what kind of action we need to take place
select {
case payloadMessage = <-cfg.reverseDownloadCh:
case <-cfg.hd.SkipCycleHack:
atomic.StoreUint32(cfg.waitingPosHeaders, 0)
return nil
}

atomic.StoreUint32(cfg.waitingPosHeaders, 0)
header := payloadMessage.Header

Expand Down Expand Up @@ -173,14 +181,14 @@ func HeadersPOS(
return err
}
if parent != nil && parent.Hash() == header.ParentHash {
if err := cfg.hd.VerifyHeader(header); err != nil {
/*if err := cfg.hd.VerifyHeader(header); err != nil {
log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "error", err)
cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Invalid,
LatestValidHash: header.ParentHash,
}
return nil
}
}*/

// For the sake of simplicity we can just assume it will be valid for now.
// TODO(yperbasis): move to execution stage
Expand Down
94 changes: 64 additions & 30 deletions eth/stagedsync/stage_mining_create_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,50 @@ type MiningBlock struct {
}

type MiningState struct {
MiningConfig *params.MiningConfig
PendingResultCh chan *types.Block
MiningResultCh chan *types.Block
MiningBlock *MiningBlock
MiningConfig *params.MiningConfig
PendingResultCh chan *types.Block
MiningResultCh chan *types.Block
MiningResultPOSCh chan *types.Block
MiningBlock *MiningBlock
}

func NewMiningState(cfg *params.MiningConfig) MiningState {
return MiningState{
MiningConfig: cfg,
PendingResultCh: make(chan *types.Block, 1),
MiningResultCh: make(chan *types.Block, 1),
MiningBlock: &MiningBlock{},
MiningConfig: cfg,
PendingResultCh: make(chan *types.Block, 1),
MiningResultCh: make(chan *types.Block, 1),
MiningResultPOSCh: make(chan *types.Block, 1),
MiningBlock: &MiningBlock{},
}
}

type BlockProposerParametersPOS struct {
Random common.Hash
SuggestedFeeRecipient common.Address // For now, we apply a suggested recipient only if etherbase is unset
Timestamp uint64
}

type MiningCreateBlockCfg struct {
db kv.RwDB
miner MiningState
chainConfig params.ChainConfig
engine consensus.Engine
txPool2 *txpool.TxPool
txPool2DB kv.RoDB
tmpdir string
db kv.RwDB
miner MiningState
chainConfig params.ChainConfig
engine consensus.Engine
txPool2 *txpool.TxPool
txPool2DB kv.RoDB
tmpdir string
blockProposerParameters *BlockProposerParametersPOS
}

func StageMiningCreateBlockCfg(db kv.RwDB, miner MiningState, chainConfig params.ChainConfig, engine consensus.Engine, txPool2 *txpool.TxPool, txPool2DB kv.RoDB, tmpdir string) MiningCreateBlockCfg {
func StageMiningCreateBlockCfg(db kv.RwDB, miner MiningState, chainConfig params.ChainConfig, engine consensus.Engine, txPool2 *txpool.TxPool, txPool2DB kv.RoDB, blockProposerParameters *BlockProposerParametersPOS, tmpdir string) MiningCreateBlockCfg {
return MiningCreateBlockCfg{
db: db,
miner: miner,
chainConfig: chainConfig,
engine: engine,
txPool2: txPool2,
txPool2DB: txPool2DB,
tmpdir: tmpdir,
db: db,
miner: miner,
chainConfig: chainConfig,
engine: engine,
txPool2: txPool2,
txPool2DB: txPool2DB,
tmpdir: tmpdir,
blockProposerParameters: blockProposerParameters,
}
}

Expand All @@ -85,10 +95,6 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
staleThreshold = 7
)

if cfg.miner.MiningConfig.Etherbase == (common.Address{}) {
return fmt.Errorf("refusing to mine without etherbase")
}

logPrefix := s.LogPrefix()
executionAt, err := s.ExecutionAt(tx)
if err != nil {
Expand All @@ -99,6 +105,19 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
return fmt.Errorf(fmt.Sprintf("[%s] Empty block", logPrefix), "blocknum", executionAt)
}

isTrans, err := rawdb.Transitioned(tx, executionAt, cfg.chainConfig.TerminalTotalDifficulty)
if err != nil {
return err
}

if cfg.miner.MiningConfig.Etherbase == (common.Address{}) {
if !isTrans {
return fmt.Errorf("refusing to mine without etherbase")
}
// If we do not have an etherbase, let's use the suggested one
coinbase = cfg.blockProposerParameters.SuggestedFeeRecipient
}

blockNum := executionAt + 1
var txs []types.Transaction
if err = cfg.txPool2DB.View(context.Background(), func(tx kv.Tx) error {
Expand Down Expand Up @@ -161,10 +180,15 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
}

// re-written miner/worker.go:commitNewWork
timestamp := time.Now().Unix()
if parent.Time >= uint64(timestamp) {
timestamp = int64(parent.Time + 1)
var timestamp int64
// If we are on proof-of-stake timestamp should be already set for us
if !isTrans {
timestamp = time.Now().Unix()
if parent.Time >= uint64(timestamp) {
timestamp = int64(parent.Time + 1)
}
}

num := parent.Number
header := &types.Header{
ParentHash: parent.Hash(),
Expand Down Expand Up @@ -202,6 +226,16 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
return err
}

if isTrans {
// We apply pre-made fields
header.MixDigest = cfg.blockProposerParameters.Random
header.Time = cfg.blockProposerParameters.Timestamp

current.Header = header
current.Uncles = nil
return nil
}

// If we are care about TheDAO hard-fork check whether to override the extra-data or not
if daoBlock := cfg.chainConfig.DAOForkBlock; daoBlock != nil {
// Check whether the block is among the fork extra-override range
Expand Down
11 changes: 11 additions & 0 deletions eth/stagedsync/stage_mining_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/log/v3"
Expand Down Expand Up @@ -54,6 +55,16 @@ func SpawnMiningFinishStage(s *StageState, tx kv.RwTx, cfg MiningFinishCfg, quit
//}
//prev = sealHash

// If we are on POS, we will send the result on the POS channel
isTrans, err := rawdb.Transitioned(tx, block.Header().Number.Uint64(), cfg.chainConfig.TerminalTotalDifficulty)
if err != nil {
return err
}

if isTrans {
cfg.miningState.MiningResultPOSCh <- block
return nil
}
// Tests may set pre-calculated nonce
if block.NonceU64() != 0 {
cfg.miningState.MiningResultCh <- block
Expand Down
8 changes: 4 additions & 4 deletions ethdb/privateapi/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestMockDownloadRequest(t *testing.T) {
statusCh := make(chan ExecutionStatus)
waitingForHeaders := uint32(1)

backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false)

var err error
var reply *remote.EngineExecutePayloadReply
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestMockValidExecution(t *testing.T) {
statusCh := make(chan ExecutionStatus)
waitingForHeaders := uint32(1)

backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false)

var err error
var reply *remote.EngineExecutePayloadReply
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestMockInvalidExecution(t *testing.T) {
statusCh := make(chan ExecutionStatus)

waitingForHeaders := uint32(1)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false)

var err error
var reply *remote.EngineExecutePayloadReply
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestNoTTD(t *testing.T) {
statusCh := make(chan ExecutionStatus)
waitingForHeaders := uint32(1)

backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{}, reverseDownloadCh, statusCh, &waitingForHeaders)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false)

var err error

Expand Down
Loading