Skip to content

Commit

Permalink
Eth1 monitor fixes
Browse files Browse the repository at this point in the history
* Fix a resource leak introduced in #3279

* Don't restart the Eth1 syncing proggress from scratch in case of
  monitor failures during Eth2 syncing.

* Switch to the primary operator as soon as it is back online.

* Log the web3 credentials in fewer places

Other changes:

The 'web3 test' command has been enhanced to obtain and print more
data regarding the selected provider.
  • Loading branch information
zah committed Feb 2, 2022
1 parent 702d9e8 commit 1d3d914
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 52 deletions.
5 changes: 4 additions & 1 deletion beacon_chain/consensus_object_pools/block_pools_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type
OnReorgCallback* =
proc(data: ReorgInfoObject) {.gcsafe, raises: [Defect].}
OnFinalizedCallback* =
proc(data: FinalizationInfoObject) {.gcsafe, raises: [Defect].}
proc(dag: ChainDAGRef, data: FinalizationInfoObject) {.gcsafe, raises: [Defect].}

KeyedBlockRef* = object
# Special wrapper for BlockRef used in ChainDAG.blocks that allows lookup
Expand Down Expand Up @@ -250,6 +250,9 @@ func shortLog*(v: EpochKey): string =
# epoch:root when logging epoch, root:slot when logging slot!
$v.epoch & ":" & shortLog(v.blck)

template setFinalizationCb*(dag: ChainDAGRef, cb: OnFinalizedCallback) =
dag.onFinHappened = cb

func shortLog*(v: EpochRef): string =
# epoch:root when logging epoch, root:slot when logging slot!
if v.isNil():
Expand Down
2 changes: 1 addition & 1 deletion beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,7 @@ proc updateHead*(
dag.finalizedHead.blck.root,
stateRoot,
dag.finalizedHead.slot.epoch)
dag.onFinHappened(data)
dag.onFinHappened(dag, data)

proc isInitialized*(T: type ChainDAGRef, db: BeaconChainDB): Result[void, cstring] =
# Lightweight check to see if we have the minimal information needed to
Expand Down
173 changes: 132 additions & 41 deletions beacon_chain/eth1/eth1_monitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type
Eth1MonitorState = enum
Initialized
Started
ReadyToRestartToPrimary
Failed
Stopping
Stopped
Expand Down Expand Up @@ -392,7 +393,6 @@ template awaitWithRetries*[T](lazyFutExpr: Future[T],
timeout = web3Timeouts): untyped =
const
reqType = astToStr(lazyFutExpr)

var
retryDelayMs = 16000
f: Future[T]
Expand Down Expand Up @@ -714,17 +714,19 @@ func lowerBound(chain: Eth1Chain, depositCount: uint64): Eth1Block =

proc trackFinalizedState(chain: var Eth1Chain,
finalizedEth1Data: Eth1Data,
finalizedStateDepositIndex: uint64): bool =
finalizedStateDepositIndex: uint64,
blockProposalExpected = false): bool =
# Returns true if the Eth1Monitor is synced to the finalization point
if chain.blocks.len == 0:
debug "Eth1 chain not initialized"
return false

let latest = chain.blocks.peekLast
if latest.voteData.deposit_count < finalizedEth1Data.deposit_count:
warn "Eth1 chain not synced",
ourDepositsCount = latest.voteData.deposit_count,
targetDepositsCount = finalizedEth1Data.deposit_count
if blockProposalExpected:
error "The Eth1 chain is not synced",
ourDepositsCount = latest.voteData.deposit_count,
targetDepositsCount = finalizedEth1Data.deposit_count
return false

let matchingBlock = chain.lowerBound(finalizedEth1Data.deposit_count)
Expand Down Expand Up @@ -764,7 +766,8 @@ proc getBlockProposalData*(chain: var Eth1Chain,
let
periodStart = voting_period_start_time(state)
hasLatestDeposits = chain.trackFinalizedState(finalizedEth1Data,
finalizedStateDepositIndex)
finalizedStateDepositIndex,
blockProposalExpected = true)

var otherVotesCountTable = initCountTable[Eth1Data]()
for vote in getStateField(state, eth1_data_votes):
Expand Down Expand Up @@ -900,7 +903,6 @@ proc init*(T: type Eth1Monitor,
eth1Network: Option[Eth1Network],
forcePolling: bool): T =
doAssert web3Urls.len > 0

var web3Urls = web3Urls
for url in mitems(web3Urls):
fixupWeb3Urls url
Expand All @@ -926,11 +928,41 @@ func clear(chain: var Eth1Chain) =
chain.blocksByHash.clear()
chain.hasConsensusViolation = false

proc resetState(m: Eth1Monitor) {.async.} =
safeCancel m.runFut
proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} =
const checkInterval = chronos.seconds(30)

let
web3Url = m.web3Urls[0]
initialRunFut = m.runFut

# This is a way to detect that the monitor was restarted. When this
# happens, this function will just return terminating the "async thread"
while m.runFut == initialRunFut:
let tempProviderRes = await Web3DataProvider.new(
m.depositContractAddress,
web3Url)

if tempProviderRes.isErr:
await sleepAsync(checkInterval)
continue

var tempProvider = tempProviderRes.get
var testRequest = tempProvider.web3.provider.net_version()

yield testRequest

try: await tempProvider.close()
except CatchableError as err:
debug "Failed to close temp web3 provider", err = err.msg

m.depositsChain.clear()
m.latestEth1Block = none(FullBlockId)
if testRequest.failed:
await sleepAsync(checkInterval)
elif m.state == Started:
m.state = ReadyToRestartToPrimary
return

proc doStop(m: Eth1Monitor) {.async.} =
safeCancel m.runFut

if m.dataProvider != nil:
await m.dataProvider.close()
Expand All @@ -950,9 +982,9 @@ proc ensureDataProvider*(m: Eth1Monitor) {.async.} =
v.get()

proc stop(m: Eth1Monitor) {.async.} =
if m.state == Started:
if m.state in {Started, ReadyToRestartToPrimary}:
m.state = Stopping
m.stopFut = resetState(m)
m.stopFut = m.doStop()
await m.stopFut
m.state = Stopped
elif m.state == Stopping:
Expand Down Expand Up @@ -1002,7 +1034,6 @@ proc syncBlockRange(m: Eth1Monitor,
# Reduce all request rate until we have a more general solution
# for dealing with Infura's rate limits
await sleepAsync(milliseconds(backoff))

let jsonLogsFut = m.dataProvider.ns.getJsonLogs(
DepositEvent,
fromBlock = some blockId(currentBlock),
Expand Down Expand Up @@ -1142,25 +1173,61 @@ func init(T: type FullBlockId, blk: Eth1BlockHeader|BlockObject): T =
FullBlockId(number: Eth1BlockNumber blk.number, hash: blk.hash)

proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
if m.state == Failed:
await m.resetState()
elif m.state == Stopping:
await m.stopFut
if m.state in {Started, ReadyToRestartToPrimary}:
return

let isFirstRun = m.state == Initialized

if delayBeforeStart != ZeroDuration:
await sleepAsync(delayBeforeStart)

# If the monitor died with an exception, the web3 provider may be in
# an arbitary state, so we better reset it (not doing this has resulted
# in resource leaks historically).
if not m.dataProvider.isNil and m.state == Failed:
# We introduce a local var to eliminate the risk of scheduling two
# competing calls to `close` below.
let provider = m.dataProvider
m.dataProvider = nil
await provider.close()

m.state = Started
await m.ensureDataProvider()
let
web3Url = m.web3Urls[(m.startIdx + m.web3Urls.len - 1) mod m.web3Urls.len]
web3 = m.dataProvider.web3

# We might need to reset the chain if the new provider disagrees
# with the previous one regarding the history of the chain or if
# we have detected a conensus violation - our view disagreeing with
# the majority of the validators in the network.
#
# Consensus violations happen in practice because the web3 providers
# sometimes return incomplete or incorrect deposit log events even
# when they don't indicate any errors in the response. When this
# happens, we are usually able to download the data successfully
# on the second attempt.
if m.latestEth1Block.isSome and m.depositsChain.blocks.len > 0:
let needsReset = m.depositsChain.hasConsensusViolation or (block:
let
lastKnownBlock = m.depositsChain.blocks.peekLast
matchingBlockAtNewProvider = awaitWithRetries(
m.dataProvider.getBlockByNumber lastKnownBlock.number)

lastKnownBlock.voteData.block_hash.asBlockHash != matchingBlockAtNewProvider.hash)

if needsReset:
m.depositsChain.clear()
m.latestEth1Block = none(FullBlockId)

template web3Url: string = m.dataProvider.url

if web3Url != m.web3Urls[0]:
asyncSpawn m.detectPrimaryProviderComingOnline()

info "Starting Eth1 deposit contract monitoring",
contract = $m.depositContractAddress, url = web3Url
contract = $m.depositContractAddress

if m.state == Initialized and m.eth1Network.isSome:
if isFirstRun and m.eth1Network.isSome:
let
providerNetwork = awaitWithRetries web3.provider.net_version()
providerNetwork = awaitWithRetries m.dataProvider.web3.provider.net_version()
expectedNetwork = case m.eth1Network.get
of mainnet: "1"
of rinkeby: "4"
Expand All @@ -1170,7 +1237,6 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
expectedNetwork, providerNetwork
quit 1

m.state = Started
var mustUsePolling = m.forcePolling or
web3Url.startsWith("http://") or
web3Url.startsWith("https://")
Expand All @@ -1190,24 +1256,24 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
proc subscriptionErrorHandler(err: CatchableError)
{.raises: [Defect], gcsafe.} =
warn "Failed to subscribe for block headers. Switching to polling",
web3Url, err = err.msg
err = err.msg
mustUsePolling = true

await m.dataProvider.onBlockHeaders(newBlockHeadersHandler,
subscriptionErrorHandler)

let startBlock = awaitWithRetries(
m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash))
if m.depositsChain.blocks.len == 0:
let startBlock = awaitWithRetries(
m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash))

doAssert m.depositsChain.blocks.len == 0
m.depositsChain.addBlock Eth1Block(
number: Eth1BlockNumber startBlock.number,
timestamp: Eth1BlockTimestamp startBlock.timestamp,
voteData: eth1DataFromMerkleizer(
m.depositsChain.finalizedBlockHash,
m.depositsChain.finalizedDepositsMerkleizer))
m.depositsChain.addBlock Eth1Block(
number: Eth1BlockNumber startBlock.number,
timestamp: Eth1BlockTimestamp startBlock.timestamp,
voteData: eth1DataFromMerkleizer(
m.depositsChain.finalizedBlockHash,
m.depositsChain.finalizedDepositsMerkleizer))

var eth1SyncedTo = Eth1BlockNumber startBlock.number
var eth1SyncedTo = Eth1BlockNumber m.depositsChain.blocks.peekLast.number
eth1_synced_head.set eth1SyncedTo.toGaugeValue
eth1_finalized_head.set eth1SyncedTo.toGaugeValue
eth1_finalized_deposits.set(
Expand All @@ -1229,6 +1295,11 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
if m.depositsChain.hasConsensusViolation:
raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus")

if m.state == ReadyToRestartToPrimary:
info "Primary web3 provider is back online. Restarting the Eth1 monitor"
m.startIdx = 0
return

if mustUsePolling:
let blk = awaitWithRetries(
m.dataProvider.web3.provider.eth_getBlockByNumber(blockId("latest"), false))
Expand Down Expand Up @@ -1271,7 +1342,6 @@ proc start(m: Eth1Monitor, delayBeforeStart: Duration) =
if runFut.error[] of CatchableError:
if runFut == m.runFut:
warn "Eth1 chain monitoring failure, restarting", err = runFut.error.msg
m.dataProvider = nil
m.state = Failed
else:
fatal "Fatal exception reached", err = runFut.error.msg
Expand Down Expand Up @@ -1303,13 +1373,35 @@ proc testWeb3Provider*(web3Url: Uri,
let
web3 = mustSucceed "connect to web3 provider":
await newWeb3($web3Url)
network = mustSucceed "get network version":
networkVersion = mustSucceed "get network version":
awaitWithRetries web3.provider.net_version()
latestBlock = mustSucceed "get latest block":
awaitWithRetries web3.provider.eth_getBlockByNumber(blockId("latest"), false)

echo "Network: ", network
syncStatus = mustSucceed "get sync status":
awaitWithRetries web3.provider.eth_syncing()
listening = mustSucceed "get network listening":
awaitWithRetries web3.provider.net_listening()
peers =
try:
awaitWithRetries web3.provider.net_peerCount()
except:
0
clientVersion = mustSucceed "get client version":
awaitWithRetries web3.provider.web3_clientVersion()
protocolVersion = mustSucceed "get protocol version":
awaitWithRetries web3.provider.eth_protocolVersion()
mining = mustSucceed "get mining status":
awaitWithRetries web3.provider.eth_mining()

echo "Client Version: ", clientVersion
echo "Protocol Version: ", protocolVersion, " (", $protocolVersion.fromHex[:int], ")"
echo "Network Version: ", networkVersion
echo "Network Listening: ", listening
echo "Network Peers: ", peers
echo "Syncing: ", syncStatus
echo "Latest block: ", latestBlock.number.uint64
echo "Last Known Nonce: ", web3.lastKnownNonce
echo "Mining: ", mining

let ns = web3.contractSender(DepositContract, depositContractAddress)
try:
Expand All @@ -1318,7 +1410,6 @@ proc testWeb3Provider*(web3Url: Uri,
echo "Deposit root: ", depositRoot
except CatchableError as err:
echo "Web3 provider is not archive mode: ", err.msg

when hasGenesisDetection:
proc init*(T: type Eth1Monitor,
cfg: RuntimeConfig,
Expand Down
24 changes: 16 additions & 8 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,19 @@ proc init*(T: type BeaconNode,
eventBus.emit("head-change", data)
proc onChainReorg(data: ReorgInfoObject) =
eventBus.emit("chain-reorg", data)
proc onFinalization(data: FinalizationInfoObject) =
eventBus.emit("finalization", data)
proc makeOnFinalizationCb(
# This `nimcall` functions helps for keeping track of what
# needs to be captured by the onFinalization closure.
eventBus: AsyncEventBus,
eth1Monitor: Eth1Monitor): OnFinalizedCallback {.nimcall.} =
static: doAssert (eventBus is ref) and (eth1Monitor is ref)
return proc(dag: ChainDAGRef, data: FinalizationInfoObject) =
if eth1Monitor != nil:
let finalizedEpochRef = dag.getFinalizedEpochRef()
discard trackFinalizedState(eth1Monitor,
finalizedEpochRef.eth1_data,
finalizedEpochRef.eth1_deposit_index)
eventBus.emit("finalization", data)
proc onSyncContribution(data: SignedContributionAndProof) =
eventBus.emit("sync-contribution-and-proof", data)

Expand Down Expand Up @@ -341,7 +352,7 @@ proc init*(T: type BeaconNode,
else: {}
dag = ChainDAGRef.init(
cfg, db, validatorMonitor, chainDagFlags, onBlockAdded, onHeadChanged,
onChainReorg, onFinalization)
onChainReorg)
quarantine = newClone(Quarantine.init())
databaseGenesisValidatorsRoot =
getStateField(dag.headState.data, genesis_validators_root)
Expand Down Expand Up @@ -509,6 +520,8 @@ proc init*(T: type BeaconNode,
else:
nil

dag.setFinalizationCb makeOnFinalizationCb(eventBus, eth1Monitor)

var node = BeaconNode(
nickname: nickname,
graffitiBytes: if config.graffiti.isSome: config.graffiti.get
Expand Down Expand Up @@ -1106,11 +1119,6 @@ proc onSlotStart(

await node.handleValidatorDuties(lastSlot, wallSlot)

if node.eth1Monitor != nil and (wallSlot mod SLOTS_PER_EPOCH) == 0:
let finalizedEpochRef = node.dag.getFinalizedEpochRef()
discard node.eth1Monitor.trackFinalizedState(
finalizedEpochRef.eth1_data, finalizedEpochRef.eth1_deposit_index)

await onSlotEnd(node, wallSlot)

proc handleMissingBlocks(node: BeaconNode) =
Expand Down
2 changes: 1 addition & 1 deletion vendor/nim-json-rpc

0 comments on commit 1d3d914

Please sign in to comment.