Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BN: Fix el_manager timeouts issue in block processing. #6665

Merged
merged 4 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 57 additions & 16 deletions beacon_chain/el/el_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ export
logScope:
topics = "elman"

const
SleepDurations =
[100.milliseconds, 200.milliseconds, 500.milliseconds, 1.seconds]

type
FixedBytes[N: static int] = web3.FixedBytes[N]
PubKeyBytes = DynamicBytes[48, 48]
Expand All @@ -43,6 +47,11 @@ type
WithoutTimeout* = distinct int
Address = web3.Address

DeadlineObject* = object
# TODO (cheatfate): This object declaration could be removed when
# `Raising()` macro starts to support procedure arguments.
future*: Future[void].Raising([CancelledError])

SomeEnginePayloadWithValue =
BellatrixExecutionPayloadWithValue |
GetPayloadV2Response |
Expand Down Expand Up @@ -233,6 +242,22 @@ declareCounter engine_api_last_minute_forkchoice_updates_sent,
"Number of last minute requests to the forkchoiceUpdated Engine API end-point just before block proposals",
labels = ["url"]

proc init*(t: typedesc[DeadlineObject], d: Duration): DeadlineObject =
DeadlineObject(future: sleepAsync(d))

proc variedSleep*(
counter: var int,
durations: openArray[Duration]
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
doAssert(len(durations) > 0, "Empty durations array!")
let index =
if (counter < 0) or (counter > len(durations)):
cheatfate marked this conversation as resolved.
Show resolved Hide resolved
len(durations) - 1
else:
counter
inc(counter)
sleepAsync(durations[index])

proc close(connection: ELConnection): Future[void] {.async: (raises: []).} =
if connection.web3.isSome:
try:
Expand Down Expand Up @@ -942,14 +967,16 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =

proc sendNewPayload*(
m: ELManager,
blck: SomeForkyBeaconBlock
blck: SomeForkyBeaconBlock,
deadlineObj: DeadlineObject
): Future[PayloadExecutionStatus] {.async: (raises: [CancelledError]).} =
let
startTime = Moment.now()
deadline = sleepAsync(NEWPAYLOAD_TIMEOUT)
deadline = deadlineObj.future
payload = blck.body.asEngineExecutionPayload
var
responseProcessor = ELConsensusViolationDetector.init()
sleepCounter = 0

while true:
block mainLoop:
Expand Down Expand Up @@ -1037,14 +1064,17 @@ proc sendNewPayload*(
# is not finished.

# To avoid continous spam of requests when EL node is offline we
# going to sleep until next attempt for
# (NEWPAYLOAD_TIMEOUT / 4) time (2.seconds).
let timeout =
chronos.nanoseconds(NEWPAYLOAD_TIMEOUT.nanoseconds div 4)
await sleepAsync(timeout)

# going to sleep until next attempt.
await variedSleep(sleepCounter, SleepDurations)
break mainLoop

proc sendNewPayload*(
m: ELManager,
blck: SomeForkyBeaconBlock
): Future[PayloadExecutionStatus] {.
async: (raises: [CancelledError], raw: true).} =
sendNewPayload(m, blck, DeadlineObject.init(FORKCHOICEUPDATED_TIMEOUT))
cheatfate marked this conversation as resolved.
Show resolved Hide resolved

proc forkchoiceUpdatedForSingleEL(
connection: ELConnection,
state: ref ForkchoiceStateV1,
Expand Down Expand Up @@ -1072,7 +1102,8 @@ proc forkchoiceUpdated*(
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
payloadAttributes: Opt[PayloadAttributesV1] |
Opt[PayloadAttributesV2] |
Opt[PayloadAttributesV3]
Opt[PayloadAttributesV3],
deadlineObj: DeadlineObject
): Future[(PayloadExecutionStatus, Opt[BlockHash])] {.
async: (raises: [CancelledError]).} =

Expand Down Expand Up @@ -1132,9 +1163,11 @@ proc forkchoiceUpdated*(
safeBlockHash: safeBlockHash.asBlockHash,
finalizedBlockHash: finalizedBlockHash.asBlockHash)
startTime = Moment.now
deadline = sleepAsync(FORKCHOICEUPDATED_TIMEOUT)
deadline = deadlineObj.future

var
responseProcessor = ELConsensusViolationDetector.init()
sleepCounter = 0

while true:
block mainLoop:
Expand Down Expand Up @@ -1218,14 +1251,22 @@ proc forkchoiceUpdated*(
# is not finished.

# To avoid continous spam of requests when EL node is offline we
# going to sleep until next attempt for
# (FORKCHOICEUPDATED_TIMEOUT / 4) time (2.seconds).
let timeout =
chronos.nanoseconds(FORKCHOICEUPDATED_TIMEOUT.nanoseconds div 4)
await sleepAsync(timeout)

# going to sleep until next attempt.
await variedSleep(sleepCounter, SleepDurations)
break mainLoop

proc forkchoiceUpdated*(
m: ELManager,
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
payloadAttributes: Opt[PayloadAttributesV1] |
Opt[PayloadAttributesV2] |
Opt[PayloadAttributesV3]
): Future[(PayloadExecutionStatus, Opt[BlockHash])] {.
async: (raises: [CancelledError], raw: true).} =
cheatfate marked this conversation as resolved.
Show resolved Hide resolved
forkchoiceUpdated(
m, headBlockHash, safeBlockHash, finalizedBlockHash,
payloadAttributes, DeadlineObject.init(FORKCHOICEUPDATED_TIMEOUT))

# TODO can't be defined within exchangeConfigWithSingleEL
func `==`(x, y: Quantity): bool {.borrow.}

Expand Down
54 changes: 40 additions & 14 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -230,19 +230,22 @@ from web3/engine_api_types import
PayloadAttributesV1, PayloadAttributesV2, PayloadAttributesV3,
PayloadExecutionStatus, PayloadStatusV1
from ../el/el_manager import
ELManager, forkchoiceUpdated, hasConnection, hasProperlyConfiguredConnection,
sendNewPayload
ELManager, DeadlineObject, forkchoiceUpdated, hasConnection,
hasProperlyConfiguredConnection, sendNewPayload, init

proc expectValidForkchoiceUpdated(
elManager: ELManager, headBlockPayloadAttributesType: typedesc,
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
receivedBlock: ForkySignedBeaconBlock): Future[void] {.async: (raises: [CancelledError]).} =
receivedBlock: ForkySignedBeaconBlock,
deadlineObj: DeadlineObject
): Future[void] {.async: (raises: [CancelledError]).} =
let
(payloadExecutionStatus, _) = await elManager.forkchoiceUpdated(
headBlockHash = headBlockHash,
safeBlockHash = safeBlockHash,
finalizedBlockHash = finalizedBlockHash,
payloadAttributes = Opt.none headBlockPayloadAttributesType)
payloadAttributes = Opt.none headBlockPayloadAttributesType,
deadlineObj = deadlineObj)
receivedExecutionBlockHash =
when typeof(receivedBlock).kind >= ConsensusFork.Bellatrix:
receivedBlock.message.body.execution_payload.block_hash
Expand Down Expand Up @@ -288,8 +291,10 @@ from ../spec/datatypes/capella import
from ../spec/datatypes/deneb import SignedBeaconBlock, asTrusted, shortLog

proc newExecutionPayload*(
elManager: ELManager, blck: SomeForkyBeaconBlock):
Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} =
elManager: ELManager,
blck: SomeForkyBeaconBlock,
deadlineObj: DeadlineObject
): Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} =

template executionPayload: untyped = blck.body.execution_payload

Expand All @@ -306,7 +311,7 @@ proc newExecutionPayload*(
executionPayload = shortLog(executionPayload)

try:
let payloadStatus = await elManager.sendNewPayload(blck)
let payloadStatus = await elManager.sendNewPayload(blck, deadlineObj)

debug "newPayload: succeeded",
parentHash = executionPayload.parent_hash,
Expand All @@ -323,22 +328,32 @@ proc newExecutionPayload*(
blockNumber = executionPayload.block_number
return Opt.none PayloadExecutionStatus

proc newExecutionPayload*(
elManager: ELManager,
blck: SomeForkyBeaconBlock
): Future[Opt[PayloadExecutionStatus]] {.
async: (raises: [CancelledError], raw: true).} =
newExecutionPayload(
elManager, blck, DeadlineObject.init(FORKCHOICEUPDATED_TIMEOUT))

proc getExecutionValidity(
elManager: ELManager,
blck: bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock |
deneb.SignedBeaconBlock | electra.SignedBeaconBlock):
Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} =
deneb.SignedBeaconBlock | electra.SignedBeaconBlock,
deadlineObj: DeadlineObject
): Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} =
if not blck.message.is_execution_block:
return NewPayloadStatus.valid # vacuously

try:
let executionPayloadStatus = await elManager.newExecutionPayload(
blck.message)
blck.message, deadlineObj)
if executionPayloadStatus.isNone:
return NewPayloadStatus.noResponse

case executionPayloadStatus.get
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
of PayloadExecutionStatus.invalid,
PayloadExecutionStatus.invalid_block_hash:
# Blocks come either from gossip or request manager requests. In the
# former case, they've passed libp2p gosisp validation which implies
# correct signature for correct proposer,which makes spam expensive,
Expand Down Expand Up @@ -424,6 +439,14 @@ proc storeBlock(
vm = self.validatorMonitor
dag = self.consensusManager.dag
wallSlot = wallTime.slotOrZero
deadlineTime =
block:
let slotTime = (wallSlot + 1).start_beacon_time() - 1.seconds
if slotTime <= wallTime:
0.seconds
else:
chronos.nanoseconds((slotTime - wallTime).nanoseconds)
deadlineObj = DeadlineObject.init(deadlineTime)
Comment on lines +448 to +455
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. I think this proc is called also when syncing. If a block gets synced shortly before the end of a slot, this may give it a very short newPayload deadline (or even 0).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is present an overloaded procedure which still uses 8.seconds timeout and it will be used in all the places where DeadlineObject is not used as last argument.


# If the block is missing its parent, it will be re-orphaned below
self.consensusManager.quarantine[].removeOrphan(signedBlock)
Expand Down Expand Up @@ -529,7 +552,8 @@ proc storeBlock(
NewPayloadStatus.noResponse
else:
when typeof(signedBlock).kind >= ConsensusFork.Bellatrix:
await self.consensusManager.elManager.getExecutionValidity(signedBlock)
await self.consensusManager.elManager.getExecutionValidity(
signedBlock, deadlineObj)
else:
NewPayloadStatus.valid # vacuously
payloadValid = payloadStatus == NewPayloadStatus.valid
Expand Down Expand Up @@ -696,7 +720,8 @@ proc storeBlock(
self.consensusManager[].optimisticExecutionBlockHash,
safeBlockHash = newHead.get.safeExecutionBlockHash,
finalizedBlockHash = newHead.get.finalizedExecutionBlockHash,
payloadAttributes = Opt.none attributes)
payloadAttributes = Opt.none attributes,
deadlineObj = deadlineObj)

let consensusFork = self.consensusManager.dag.cfg.consensusForkAtEpoch(
newHead.get.blck.bid.slot.epoch)
Expand All @@ -723,7 +748,8 @@ proc storeBlock(
headBlockHash = headExecutionBlockHash,
safeBlockHash = newHead.get.safeExecutionBlockHash,
finalizedBlockHash = newHead.get.finalizedExecutionBlockHash,
receivedBlock = signedBlock)
receivedBlock = signedBlock,
deadlineObj = deadlineObj)

template callForkChoiceUpdated: auto =
case self.consensusManager.dag.cfg.consensusForkAtEpoch(
Expand Down
Loading