Skip to content

Commit

Permalink
Added mining for POS (#3187)
Browse files Browse the repository at this point in the history
* added comunication with channels

* added final mining

* removed trash

* bug fixing

* async calls

* one thread

* tests

* better comments

* no lock

* better placing of skipCycleHack

* removed long timer

* moved transitioned block processing

* better naming

* disabled updates on blocks

* sync.Cond

* 2 sync.Cond

* better locking

* Use single sync.Cond. Proposer shutdown

Co-authored-by: yperbasis <[email protected]>
  • Loading branch information
Giulio2002 and yperbasis authored Jan 4, 2022
1 parent 156287a commit 195eb9a
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 93 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)

miningSync := stagedsync.New(
stagedsync.MiningStages(ctx,
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, tmpdir),
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, tmpdir),
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, tmpdir),
stagedsync.StageHashStateCfg(db, tmpdir),
stagedsync.StageTrieCfg(db, false, true, tmpdir, getBlockReader(chainConfig)),
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
miner.MiningConfig.ExtraData = nextBlock.Extra()
miningStages.MockExecFunc(stages.MiningCreateBlock, func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx) error {
err = stagedsync.SpawnMiningCreateBlockStage(s, tx,
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, tmpDir),
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, tmpDir),
quit)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/rpcdaemontest/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g
ethashApi := apis[1].Service.(*ethash.API)
server := grpc.NewServer()

remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil))
remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, nil, false))
txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer)
txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi))
listener := bufconn.Listen(1024 * 1024)
Expand Down
10 changes: 7 additions & 3 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ var (
Name: "mine",
Usage: "Enable mining",
}
ProposingEnabledFlag = cli.BoolFlag{
Name: "proposer",
Usage: "Enable PoS proposer",
}
MinerNotifyFlag = cli.StringFlag{
Name: "miner.notify",
Usage: "Comma separated HTTP URL list to notify of new work packages",
Expand Down Expand Up @@ -1150,9 +1154,9 @@ func setParlia(ctx *cli.Context, cfg *params.ParliaConfig, datadir string) {
}

func setMiner(ctx *cli.Context, cfg *params.MiningConfig) {
if ctx.GlobalIsSet(MiningEnabledFlag.Name) {
cfg.Enabled = true
}
cfg.Enabled = ctx.GlobalIsSet(MiningEnabledFlag.Name)
cfg.EnabledPOS = ctx.GlobalIsSet(ProposingEnabledFlag.Name)

if cfg.Enabled && len(cfg.Etherbase.Bytes()) == 0 {
panic(fmt.Sprintf("Erigon supports only remote miners. Flag --%s or --%s is required", MinerNotifyFlag.Name, MinerSigningKeyFileFlag.Name))
}
Expand Down
57 changes: 54 additions & 3 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/ledgerwatch/erigon/eth/ethutils"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/node"
Expand Down Expand Up @@ -390,11 +391,12 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.pendingBlocks = miner.PendingResultCh
backend.minedBlocks = miner.MiningResultCh
backend.reverseDownloadCh = make(chan privateapi.PayloadMessage)
backend.statusCh = make(chan privateapi.ExecutionStatus)
backend.statusCh = make(chan privateapi.ExecutionStatus, 1)

// proof-of-work mining
mining := stagedsync.New(
stagedsync.MiningStages(backend.sentryCtx,
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, tmpdir),
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, nil, tmpdir),
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir),
stagedsync.StageHashStateCfg(backend.chainDB, tmpdir),
stagedsync.StageTrieCfg(backend.chainDB, false, true, tmpdir, blockReader),
Expand All @@ -405,10 +407,38 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
if casted, ok := backend.engine.(*ethash.Ethash); ok {
ethashApi = casted.APIs(nil)[1].Service.(*ethash.API)
}
// proof-of-stake mining
assembleBlockPOS := func(random common.Hash, suggestedFeeRecipient common.Address, timestamp uint64) (*types.Block, error) {
miningStatePos := stagedsync.NewMiningState(&config.Miner)
proposingSync := stagedsync.New(
stagedsync.MiningStages(backend.sentryCtx,
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miningStatePos, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, &stagedsync.BlockProposerParametersPOS{
Random: random,
SuggestedFeeRecipient: suggestedFeeRecipient,
Timestamp: timestamp,
}, tmpdir),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir),
stagedsync.StageHashStateCfg(backend.chainDB, tmpdir),
stagedsync.StageTrieCfg(backend.chainDB, false, true, tmpdir, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit),
), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder)
// We start the mining step
if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync); err != nil {
return nil, err
}
block := <-miningStatePos.MiningResultPOSCh
return block, nil
}
atomic.StoreUint32(&backend.waitingForBeaconChain, 0)
// Initialize ethbackend
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events,
blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain)
blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain,
backend.sentryControlServer.Hd.SkipCycleHack, assembleBlockPOS, config.Miner.EnabledPOS)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi)
// If we enabled the proposer flag we initiates the block proposing thread
if config.Miner.EnabledPOS {
ethBackendRPC.StartProposer()
}
if stack.Config().PrivateApiAddr != "" {
var creds credentials.TransportCredentials
if stack.Config().TLSConnection {
Expand Down Expand Up @@ -617,6 +647,12 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
var hasWork bool
errc := make(chan error, 1)

tx, err := s.chainDB.BeginRo(ctx)
if err != nil {
log.Warn("mining", "err", err)
return
}

for {
mineEvery.Reset(3 * time.Second)
select {
Expand All @@ -636,6 +672,21 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
case <-quitCh:
return
}
// Check if we transitioned and if we did halt POW mining
headNumber, err := stages.GetStageProgress(tx, stages.Headers)
if err != nil {
log.Warn("mining", "err", err)
return
}

isTrans, err := rawdb.Transitioned(tx, headNumber, s.chainConfig.TerminalTotalDifficulty)
if err != nil {
log.Warn("mining", "err", err)
return
}
if isTrans {
return
}

if !works && hasWork {
works = true
Expand Down
10 changes: 9 additions & 1 deletion eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,16 @@ func HeadersPOS(
) error {
// Waiting for the beacon chain
log.Info("Waiting for payloads...")
var payloadMessage privateapi.PayloadMessage
atomic.StoreUint32(cfg.waitingPosHeaders, 1)
payloadMessage := <-cfg.reverseDownloadCh
// Decide what kind of action we need to take place
select {
case payloadMessage = <-cfg.reverseDownloadCh:
case <-cfg.hd.SkipCycleHack:
atomic.StoreUint32(cfg.waitingPosHeaders, 0)
return nil
}

atomic.StoreUint32(cfg.waitingPosHeaders, 0)
header := payloadMessage.Header

Expand Down
94 changes: 64 additions & 30 deletions eth/stagedsync/stage_mining_create_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,50 @@ type MiningBlock struct {
}

type MiningState struct {
MiningConfig *params.MiningConfig
PendingResultCh chan *types.Block
MiningResultCh chan *types.Block
MiningBlock *MiningBlock
MiningConfig *params.MiningConfig
PendingResultCh chan *types.Block
MiningResultCh chan *types.Block
MiningResultPOSCh chan *types.Block
MiningBlock *MiningBlock
}

func NewMiningState(cfg *params.MiningConfig) MiningState {
return MiningState{
MiningConfig: cfg,
PendingResultCh: make(chan *types.Block, 1),
MiningResultCh: make(chan *types.Block, 1),
MiningBlock: &MiningBlock{},
MiningConfig: cfg,
PendingResultCh: make(chan *types.Block, 1),
MiningResultCh: make(chan *types.Block, 1),
MiningResultPOSCh: make(chan *types.Block, 1),
MiningBlock: &MiningBlock{},
}
}

type BlockProposerParametersPOS struct {
Random common.Hash
SuggestedFeeRecipient common.Address // For now, we apply a suggested recipient only if etherbase is unset
Timestamp uint64
}

type MiningCreateBlockCfg struct {
db kv.RwDB
miner MiningState
chainConfig params.ChainConfig
engine consensus.Engine
txPool2 *txpool.TxPool
txPool2DB kv.RoDB
tmpdir string
db kv.RwDB
miner MiningState
chainConfig params.ChainConfig
engine consensus.Engine
txPool2 *txpool.TxPool
txPool2DB kv.RoDB
tmpdir string
blockProposerParameters *BlockProposerParametersPOS
}

func StageMiningCreateBlockCfg(db kv.RwDB, miner MiningState, chainConfig params.ChainConfig, engine consensus.Engine, txPool2 *txpool.TxPool, txPool2DB kv.RoDB, tmpdir string) MiningCreateBlockCfg {
func StageMiningCreateBlockCfg(db kv.RwDB, miner MiningState, chainConfig params.ChainConfig, engine consensus.Engine, txPool2 *txpool.TxPool, txPool2DB kv.RoDB, blockProposerParameters *BlockProposerParametersPOS, tmpdir string) MiningCreateBlockCfg {
return MiningCreateBlockCfg{
db: db,
miner: miner,
chainConfig: chainConfig,
engine: engine,
txPool2: txPool2,
txPool2DB: txPool2DB,
tmpdir: tmpdir,
db: db,
miner: miner,
chainConfig: chainConfig,
engine: engine,
txPool2: txPool2,
txPool2DB: txPool2DB,
tmpdir: tmpdir,
blockProposerParameters: blockProposerParameters,
}
}

Expand All @@ -85,10 +95,6 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
staleThreshold = 7
)

if cfg.miner.MiningConfig.Etherbase == (common.Address{}) {
return fmt.Errorf("refusing to mine without etherbase")
}

logPrefix := s.LogPrefix()
executionAt, err := s.ExecutionAt(tx)
if err != nil {
Expand All @@ -99,6 +105,19 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
return fmt.Errorf(fmt.Sprintf("[%s] Empty block", logPrefix), "blocknum", executionAt)
}

isTrans, err := rawdb.Transitioned(tx, executionAt, cfg.chainConfig.TerminalTotalDifficulty)
if err != nil {
return err
}

if cfg.miner.MiningConfig.Etherbase == (common.Address{}) {
if !isTrans {
return fmt.Errorf("refusing to mine without etherbase")
}
// If we do not have an etherbase, let's use the suggested one
coinbase = cfg.blockProposerParameters.SuggestedFeeRecipient
}

blockNum := executionAt + 1
var txs []types.Transaction
if err = cfg.txPool2DB.View(context.Background(), func(tx kv.Tx) error {
Expand Down Expand Up @@ -161,10 +180,15 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
}

// re-written miner/worker.go:commitNewWork
timestamp := time.Now().Unix()
if parent.Time >= uint64(timestamp) {
timestamp = int64(parent.Time + 1)
var timestamp int64
// If we are on proof-of-stake timestamp should be already set for us
if !isTrans {
timestamp = time.Now().Unix()
if parent.Time >= uint64(timestamp) {
timestamp = int64(parent.Time + 1)
}
}

num := parent.Number
header := &types.Header{
ParentHash: parent.Hash(),
Expand Down Expand Up @@ -202,6 +226,16 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
return err
}

if isTrans {
// We apply pre-made fields
header.MixDigest = cfg.blockProposerParameters.Random
header.Time = cfg.blockProposerParameters.Timestamp

current.Header = header
current.Uncles = nil
return nil
}

// If we are care about TheDAO hard-fork check whether to override the extra-data or not
if daoBlock := cfg.chainConfig.DAOForkBlock; daoBlock != nil {
// Check whether the block is among the fork extra-override range
Expand Down
11 changes: 11 additions & 0 deletions eth/stagedsync/stage_mining_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/log/v3"
Expand Down Expand Up @@ -54,6 +55,16 @@ func SpawnMiningFinishStage(s *StageState, tx kv.RwTx, cfg MiningFinishCfg, quit
//}
//prev = sealHash

// If we are on POS, we will send the result on the POS channel
isTrans, err := rawdb.Transitioned(tx, block.Header().Number.Uint64(), cfg.chainConfig.TerminalTotalDifficulty)
if err != nil {
return err
}

if isTrans {
cfg.miningState.MiningResultPOSCh <- block
return nil
}
// Tests may set pre-calculated nonce
if block.NonceU64() != 0 {
cfg.miningState.MiningResultCh <- block
Expand Down
1 change: 1 addition & 0 deletions ethdb/privateapi/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func StartGrpc(kv *remotedbserver.KvServer, ethBackendSrv *EthBackendServer, txP
if healthCheck {
defer healthServer.Shutdown()
}
defer ethBackendSrv.StopProposer()
if err := grpcServer.Serve(lis); err != nil {
log.Error("private RPC server fail", "err", err)
}
Expand Down
8 changes: 4 additions & 4 deletions ethdb/privateapi/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestMockDownloadRequest(t *testing.T) {
statusCh := make(chan ExecutionStatus)
waitingForHeaders := uint32(1)

backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false)

var err error
var reply *remote.EngineExecutePayloadReply
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestMockValidExecution(t *testing.T) {
statusCh := make(chan ExecutionStatus)
waitingForHeaders := uint32(1)

backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false)

var err error
var reply *remote.EngineExecutePayloadReply
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestMockInvalidExecution(t *testing.T) {
statusCh := make(chan ExecutionStatus)

waitingForHeaders := uint32(1)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false)

var err error
var reply *remote.EngineExecutePayloadReply
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestNoTTD(t *testing.T) {
statusCh := make(chan ExecutionStatus)
waitingForHeaders := uint32(1)

backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{}, reverseDownloadCh, statusCh, &waitingForHeaders)
backend := NewEthBackendServer(ctx, nil, db, nil, nil, &params.ChainConfig{}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false)

var err error

Expand Down
Loading

0 comments on commit 195eb9a

Please sign in to comment.