Skip to content

Commit

Permalink
Use single sync.Cond. Proposer shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
yperbasis committed Jan 4, 2022
1 parent f612e04 commit 4de1c30
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 40 deletions.
1 change: 1 addition & 0 deletions ethdb/privateapi/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func StartGrpc(kv *remotedbserver.KvServer, ethBackendSrv *EthBackendServer, txP
if healthCheck {
defer healthServer.Shutdown()
}
defer ethBackendSrv.StopProposer()
if err := grpcServer.Serve(lis); err != nil {
log.Error("private RPC server fail", "err", err)
}
Expand Down
81 changes: 41 additions & 40 deletions ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ type EthBackendServer struct {
skipCycleHack chan struct{} // with this channel we tell the stagedsync that we want to assemble a block
assemblePayloadPOS assemblePayloadPOSFunc
proposing bool
mu sync.Mutex // Engine API is asyncronous, we want to avoid CL to call different APIs at the same time
condPauseAssemble *sync.Cond // We use it to determine if we can assemble payloads or not
condFinishAssemble *sync.Cond // We use it to determined if we an assembling
syncCond *sync.Cond // Engine API is asynchronous, we want to avoid CL to call different APIs at the same time
shutdown bool
}

type EthBackend interface {
Expand Down Expand Up @@ -96,8 +95,7 @@ func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events
return &EthBackendServer{ctx: ctx, eth: eth, events: events, db: db, blockReader: blockReader, config: config,
reverseDownloadCh: reverseDownloadCh, statusCh: statusCh, waitingForBeaconChain: waitingForBeaconChain,
pendingPayloads: make(map[uint64]types2.ExecutionPayload), skipCycleHack: skipCycleHack,
assemblePayloadPOS: assemblePayloadPOS, proposing: proposing, condPauseAssemble: sync.NewCond(&sync.Mutex{}),
condFinishAssemble: sync.NewCond(&sync.Mutex{}),
assemblePayloadPOS: assemblePayloadPOS, proposing: proposing, syncCond: sync.NewCond(&sync.Mutex{}),
}
}

Expand Down Expand Up @@ -209,8 +207,9 @@ func (s *EthBackendServer) Block(ctx context.Context, req *remote.BlockRequest)

// EngineExecutePayloadV1, executes payload
func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *types2.ExecutionPayload) (*remote.EngineExecutePayloadReply, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()

if s.config.TerminalTotalDifficulty == nil {
return nil, fmt.Errorf("not a proof-of-stake chain")
}
Expand Down Expand Up @@ -281,9 +280,8 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type

// EngineGetPayloadV1, retrieves previously assembled payload (Validators only)
func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.EngineGetPayloadRequest) (*types2.ExecutionPayload, error) {
s.condFinishAssemble.L.Lock()
defer s.condFinishAssemble.L.Unlock()
// Wait some time in case validition process has not started
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()

if !s.proposing {
return nil, fmt.Errorf("execution layer not running as a proposer. enable --proposer flag on startup")
Expand All @@ -292,29 +290,26 @@ func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.E
if s.config.TerminalTotalDifficulty == nil {
return nil, fmt.Errorf("not a proof-of-stake chain")
}
s.mu.Lock()
defer s.mu.Unlock()
payload, ok := s.pendingPayloads[req.PayloadId]
if !ok {
return nil, fmt.Errorf("unknown payload")
}
if payload.BlockNumber == 0 {
s.mu.Unlock()

for {
payload, ok := s.pendingPayloads[req.PayloadId]
if !ok {
return nil, fmt.Errorf("unknown payload")
}

if payload.BlockNumber != 0 {
return &payload, nil
}

// Wait for payloads assembling thread to finish
s.condFinishAssemble.Wait()
s.mu.Lock()
s.syncCond.Wait()
}
// stop assembling payloads
return &payload, nil

}

// EngineForkChoiceUpdatedV1, either states new block head or request the assembling of a new bloc
func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.condPauseAssemble.L.Lock()
defer s.condPauseAssemble.L.Unlock()
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()

if s.config.TerminalTotalDifficulty == nil {
return nil, fmt.Errorf("not a proof-of-stake chain")
Expand Down Expand Up @@ -351,7 +346,7 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r
Coinbase: req.Prepare.FeeRecipient,
}
// Unpause assemble process
s.condPauseAssemble.Broadcast()
s.syncCond.Broadcast()
// successfully assembled the payload and assigned the correct id
defer func() { s.payloadId++ }()
return &remote.EngineForkChoiceUpdatedReply{
Expand All @@ -363,15 +358,17 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r
func (s *EthBackendServer) StartProposer() {

go func() {
s.condPauseAssemble.L.Lock()
defer s.condPauseAssemble.L.Unlock()
s.condFinishAssemble.L.Lock()
defer s.condFinishAssemble.L.Unlock()
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()

for {
// Wait until we have to process new payloads
s.condPauseAssemble.Wait()
s.mu.Lock()
s.syncCond.Wait()

if s.shutdown {
return
}

// Go over each payload and re-update them
for id := range s.pendingPayloads {
// If we already assembled this block, let's just skip it
Expand All @@ -384,12 +381,10 @@ func (s *EthBackendServer) StartProposer() {
timestamp := s.pendingPayloads[id].Timestamp
// Tell the stage headers to leave space for the write transaction for mining stages
s.skipCycleHack <- struct{}{}
s.mu.Unlock()

block, err := s.assemblePayloadPOS(random, coinbase, timestamp)
s.mu.Lock()
if err != nil {
log.Warn("Error during block assembling", "err", err.Error())
s.mu.Unlock()
return
}
var baseFeeReply *types2.H256
Expand All @@ -407,7 +402,6 @@ func (s *EthBackendServer) StartProposer() {
err := rlp.Encode(buf, tx)
if err != nil {
log.Warn("Broken tx rlp", "err", err.Error())
s.mu.Unlock()
return
}
encodedTransactions = append(encodedTransactions, common.CopyBytes(buf.Bytes()))
Expand All @@ -431,12 +425,19 @@ func (s *EthBackendServer) StartProposer() {
}
}
// Broadcast the signal that an entire loop over pending payloads has been executed
s.condFinishAssemble.Broadcast()
s.mu.Unlock()
s.syncCond.Broadcast()
}
}()
}

func (s *EthBackendServer) StopProposer() {
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()

s.shutdown = true
s.syncCond.Broadcast()
}

func (s *EthBackendServer) NodeInfo(_ context.Context, r *remote.NodesInfoRequest) (*remote.NodesInfoReply, error) {
nodesInfo, err := s.eth.NodesInfo(int(r.Limit))
if err != nil {
Expand Down

0 comments on commit 4de1c30

Please sign in to comment.