Skip to content

Commit

Permalink
serve libp2p protocol for light client sync
Browse files Browse the repository at this point in the history
This extends the `--serve-light-client-data` launch option to serve
locally collected light client data via libp2p.
See ethereum/consensus-specs#2802

Backfill of historic best `LightClientUpdate` data and the client side
of the protocol are not yet implemented.

To test, in `conf.nim` change `serveLightClientData`'s `defaultValue` to
`true`, then run this command:
```
scripts/launch_local_testnet.sh --kill-old-processes --preset minimal \
    --nodes 4 --disable-htop --stop-at-epoch 7
```
The log files of the beacon nodes will be in the `local_testnet_data`
directory. They are named `log0.txt` through `log3.txt`. The logs can be
browsed for light client related messages.
  • Loading branch information
etan-status committed Jan 27, 2022
1 parent 6e414ce commit a9a1f09
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 18 deletions.
24 changes: 24 additions & 0 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ declareCounter beacon_sync_committee_contributions_received,
"Number of valid sync committee contributions processed by this node"
declareCounter beacon_sync_committee_contributions_dropped,
"Number of invalid sync committee contributions dropped by this node", labels = ["reason"]
declareCounter beacon_optimistic_light_client_updates_received,
"Number of valid optimistic light client updates processed by this node"
declareCounter beacon_optimistic_light_client_updates_dropped,
"Number of invalid optimistic light client updates dropped by this node", labels = ["reason"]

const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf]

Expand Down Expand Up @@ -528,3 +532,23 @@ proc contributionValidator*(
beacon_sync_committee_contributions_dropped.inc(1, [$v.error[0]])

err(v.error())

proc optimisticLightClientUpdateValidator*(
self: var Eth2Processor, src: MsgSource,
optimistic_update: OptimisticLightClientUpdate
): Result[void, ValidationError] =
logScope:
optimisticUpdate = shortLog(optimistic_update)

debug "Optimistic light client update received"

let v = self.dag.validateOptimisticLightClientUpdate(optimistic_update)
if v.isOk():
trace "Optimistic light client update validated"

beacon_optimistic_light_client_updates_received.inc()
else:
debug "Dropping optimistic light client update", error = v.error
beacon_optimistic_light_client_updates_dropped.inc(1, [$v.error[0]])

v
17 changes: 17 additions & 0 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -984,3 +984,20 @@ proc validateContribution*(
sig.get()

return ok((sig, participants))

# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/sync-protocol.md#optimistic_light_client_update
proc validateOptimisticLightClientUpdate*(
dag: ChainDAGRef, optimistic_update: OptimisticLightClientUpdate):
Result[void, ValidationError] =
template latest_local_update(): auto = dag.optimisticLightClientUpdate

if optimistic_update != latest_local_update:
# [IGNORE] The optimistic update is not attesting to the latest block's
# parent block.
if optimistic_update.attested_header != latest_local_update.attested_header:
return errIgnore("OptimisticLightClientUpdate: different attested block")

# [REJECT] The optimistic update does not match the expected value.
return errReject("OptimisticLightClientUpdate: update does not match block")

ok()
28 changes: 26 additions & 2 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type
Success
InvalidRequest
ServerError
ResourceUnavailable

PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe, raises: [Defect].}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe, raises: [Defect].}
Expand Down Expand Up @@ -204,6 +205,8 @@ type

InvalidInputsError* = object of CatchableError

ResourceUnavailableError* = object of CatchableError

NetRes*[T] = Result[T, Eth2NetworkingError]
## This is type returned from all network requests

Expand Down Expand Up @@ -707,6 +710,13 @@ proc handleIncomingStream(network: Eth2Node,
template returnInvalidRequest(msg: string) =
returnInvalidRequest(ErrorMsg msg.toBytes)

template returnResourceUnavailable(msg: ErrorMsg) =
await sendErrorResponse(peer, conn, ResourceUnavailable, msg)
return

template returnResourceUnavailable(msg: string) =
returnResourceUnavailable(ErrorMsg msg.toBytes)

let s = when useNativeSnappy:
let fs = libp2pInput(conn)

Expand Down Expand Up @@ -771,8 +781,8 @@ proc handleIncomingStream(network: Eth2Node,
await callUserHandler(MsgType, peer, conn, msg.get)
except InvalidInputsError as err:
returnInvalidRequest err.msg
await sendErrorResponse(peer, conn, ServerError,
ErrorMsg err.msg.toBytes)
except ResourceUnavailableError as err:
returnResourceUnavailable err.msg
except CatchableError as err:
await sendErrorResponse(peer, conn, ServerError,
ErrorMsg err.msg.toBytes)
Expand Down Expand Up @@ -2225,3 +2235,17 @@ proc broadcastSignedContributionAndProof*(
node: Eth2Node, msg: SignedContributionAndProof) =
let topic = getSyncCommitteeContributionAndProofTopic(node.forkDigests.altair)
node.broadcast(topic, msg)

proc broadcastOptimisticLightClientUpdate*(
node: Eth2Node, msg: OptimisticLightClientUpdate) =
let
forkDigest =
if msg.fork_version == node.cfg.SHARDING_FORK_VERSION:
node.forkDigests.sharding
elif msg.fork_version == node.cfg.BELLATRIX_FORK_VERSION:
node.forkDigests.bellatrix
else:
doAssert msg.fork_version == node.cfg.ALTAIR_FORK_VERSION
node.forkDigests.altair
topic = getOptimisticLightClientUpdateTopic(forkDigest)
node.broadcast(topic, msg)
4 changes: 2 additions & 2 deletions beacon_chain/networking/faststreams_backend.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -112,7 +112,7 @@ proc readResponseChunk(s: AsyncInputStream,

let responseCode = ResponseCode responseCodeByte
case responseCode:
of InvalidRequest, ServerError:
of InvalidRequest, ServerError, ResourceUnavailable:
let errorMsgChunk = await readChunkPayload(s, noSnappy, string)
let errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
else: return err(errorMsgChunk.error)
Expand Down
4 changes: 2 additions & 2 deletions beacon_chain/networking/libp2p_streams_backend.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -148,7 +148,7 @@ proc readResponseChunk(conn: Connection, peer: Peer,

let responseCode = ResponseCode responseCodeByte
case responseCode:
of InvalidRequest, ServerError:
of InvalidRequest, ServerError, ResourceUnavailable:
let errorMsgChunk = await readChunkPayload(conn, peer, ErrorMsg)
let errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
else: return err(errorMsgChunk.error)
Expand Down
19 changes: 19 additions & 0 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,10 @@ proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Sl
getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams)
node.network.updateSyncnetsMetadata(syncnets)

if node.config.serveLightClientData:
node.network.subscribe(
getOptimisticLightClientUpdateTopic(forkDigest), basicParams)

proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.removePhase0MessageHandlers(forkDigest)

Expand All @@ -742,6 +746,9 @@ proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(
getSyncCommitteeContributionAndProofTopic(forkDigest))

if node.config.serveLightClientData:
node.network.unsubscribe(getOptimisticLightClientUpdateTopic(forkDigest))

proc trackSyncCommitteeTopics*(node: BeaconNode) =
# TODO
discard
Expand Down Expand Up @@ -1149,6 +1156,18 @@ proc installMessageValidators(node: BeaconNode) =
installSyncCommitteeeValidators(node.dag.forkDigests.altair)
installSyncCommitteeeValidators(node.dag.forkDigests.bellatrix)

if node.config.serveLightClientData:
template installOptimisticLightClientUpdateValidator(digest: auto) =
node.network.addValidator(
getOptimisticLightClientUpdateTopic(digest),
proc(msg: OptimisticLightClientUpdate): ValidationResult =
toValidationResult(
node.processor[].optimisticLightClientUpdateValidator(
MsgSource.gossip, msg)))

installOptimisticLightClientUpdateValidator(node.dag.forkDigests.altair)
installOptimisticLightClientUpdateValidator(node.dag.forkDigests.bellatrix)

proc stop(node: BeaconNode) =
bnStatus = BeaconNodeStatus.Stopping
notice "Graceful shutdown"
Expand Down
7 changes: 6 additions & 1 deletion beacon_chain/spec/beacon_time.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -146,6 +146,9 @@ const
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/validator.md#broadcast-sync-committee-contribution
syncContributionSlotOffset* = TimeDiff(nanoseconds:
NANOSECONDS_PER_SLOT.int64 * 2 div INTERVALS_PER_SLOT)
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/sync-protocol.md#block-proposal
optimisticLightClientUpdateSlotOffset* = TimeDiff(nanoseconds:
NANOSECONDS_PER_SLOT.int64 div INTERVALS_PER_SLOT)

func toFloatSeconds*(t: TimeDiff): float =
float(t.nanoseconds) / 1_000_000_000.0
Expand All @@ -167,6 +170,8 @@ func sync_committee_message_deadline*(s: Slot): BeaconTime =
s.start_beacon_time + syncCommitteeMessageSlotOffset
func sync_contribution_deadline*(s: Slot): BeaconTime =
s.start_beacon_time + syncContributionSlotOffset
func optimistic_light_client_update_deadline*(s: Slot): BeaconTime =
s.start_beacon_time + optimisticLightClientUpdateSlotOffset

func slotOrZero*(time: BeaconTime): Slot =
let exSlot = time.toSlot
Expand Down
17 changes: 16 additions & 1 deletion beacon_chain/spec/datatypes/altair.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import
std/[typetraits, sets, hashes],
chronicles,
stew/[assign2, bitops2],
stew/[assign2, bitops2, objects],
"."/[base, phase0]

export base, sets
Expand Down Expand Up @@ -601,9 +601,24 @@ chronicles.formatIt SyncCommitteeContribution: shortLog(it)
chronicles.formatIt ContributionAndProof: shortLog(it)
chronicles.formatIt SignedContributionAndProof: shortLog(it)

func shortLog*(v: LightClientUpdate): auto =
(
attested_header: shortLog(v.attested_header),
finalized_header: shortLog(v.finalized_header),
num_active_participants: countOnes(v.sync_aggregate.sync_committee_bits),
is_signed_by_next: v.next_sync_committee.isZeroMemory
)

template hash*(x: LightClientUpdate): Hash =
hash(x.header)

func shortLog*(v: OptimisticLightClientUpdate): auto =
(
attested_header: shortLog(v.attested_header),
num_active_participants: countOnes(v.sync_aggregate.sync_committee_bits),
is_signed_by_next: v.is_signed_by_next_sync_committee
)

func clear*(info: var EpochInfo) =
info.validators.setLen(0)
info.balances = UnslashedParticipatingBalances()
Expand Down
5 changes: 5 additions & 0 deletions beacon_chain/spec/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func getSyncCommitteeContributionAndProofTopic*(forkDigest: ForkDigest): string
## For subscribing and unsubscribing to/from a subnet.
eth2Prefix(forkDigest) & "sync_committee_contribution_and_proof/ssz_snappy"

# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/sync-protocol.md#optimistic_light_client_update
func getOptimisticLightClientUpdateTopic*(forkDigest: ForkDigest): string =
## For broadcasting the latest `OptimisticLightClientUpdate` to light clients.
eth2Prefix(forkDigest) & "optimistic_light_client_update/ssz_snappy"

func getENRForkID*(cfg: RuntimeConfig,
epoch: Epoch,
genesis_validators_root: Eth2Digest): ENRForkID =
Expand Down
7 changes: 5 additions & 2 deletions beacon_chain/spec/ssz_codec.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand All @@ -17,7 +17,7 @@ export codec, base, typetraits

# Coding and decoding of SSZ to spec-specific types

template toSszType*(v: Slot|Epoch): auto = uint64(v)
template toSszType*(v: Slot|Epoch|SyncCommitteePeriod): auto = uint64(v)
template toSszType*(v: BlsCurveType): auto = toRaw(v)
template toSszType*(v: ForkDigest|GraffitiBytes): auto = distinctBase(v)
template toSszType*(v: Version): auto = distinctBase(v)
Expand All @@ -34,6 +34,9 @@ template fromSszBytes*(T: type Slot, bytes: openArray[byte]): T =
template fromSszBytes*(T: type Epoch, bytes: openArray[byte]): T =
T fromSszBytes(uint64, bytes)

template fromSszBytes*(T: type SyncCommitteePeriod, bytes: openArray[byte]): T =
T fromSszBytes(uint64, bytes)

func fromSszBytes*(T: type ForkDigest, bytes: openArray[byte]): T {.raisesssz.} =
if bytes.len != sizeof(result):
raiseIncorrectSize T
Expand Down
81 changes: 81 additions & 0 deletions beacon_chain/sync/sync_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ logScope:

const
MAX_REQUEST_BLOCKS = 1024
MAX_REQUEST_LIGHT_CLIENT_UPDATES = 128

blockByRootLookupCost = allowedOpsPerSecondCost(50)
blockResponseCost = allowedOpsPerSecondCost(100)
blockByRangeLookupCost = allowedOpsPerSecondCost(20)
lightClientUpdateResponseCost = allowedOpsPerSecondCost(100)
lightClientUpdateByRangeLookupCost = allowedOpsPerSecondCost(20)

type
StatusMsg* = object
Expand Down Expand Up @@ -356,6 +359,84 @@ p2pProtocol BeaconSync(version = 1,
debug "Block root request done",
peer, roots = blockRoots.len, count, found

proc bestLightClientUpdatesByRange(
peer: Peer,
startPeriod: SyncCommitteePeriod,
reqCount: uint64,
reqStep: uint64,
response: MultipleChunksResponse[altair.LightClientUpdate])
{.async, libp2pProtocol("best_light_client_updates_by_range", 1).} =
if not peer.networkState.dag.createLightClientData:
raise newException(ResourceUnavailableError, "Request not supported")

trace "Received BestLightClientUpdatesByRange request",
peer, startPeriod, count = reqCount, step = reqStep
if reqCount > 0'u64 and reqStep > 0'u64:
let
dag = peer.networkState.dag
headPeriod = dag.head.slot.sync_committee_period
# Limit number of updates in response
count =
if startPeriod < headPeriod:
0'u64
else:
min(reqCount,
min(1 + (headPeriod - startPeriod) div reqStep,
MAX_REQUEST_LIGHT_CLIENT_UPDATES))
onePastPeriod = startPeriod + reqStep * count
peer.updateRequestQuota(
lightClientUpdateByRangeLookupCost +
count.float * lightClientUpdateResponseCost)
peer.awaitNonNegativeRequestQuota()

var found = 0
for period in startPeriod..<onePastPeriod:
let update = dag.getBestLightClientUpdateForPeriod(period)
if update.isSome:
await response.write(update.get)
inc found

debug "BestLightClientUpdatesByRange request done",
peer, startPeriod, count, reqStep, found
else:
raise newException(InvalidInputsError, "Empty range requested")

proc latestLightClientUpdate(
peer: Peer,
response: SingleChunkResponse[altair.LightClientUpdate])
{.async, libp2pProtocol("latest_light_client_update", 1).} =
if not peer.networkState.dag.createLightClientData:
raise newException(ResourceUnavailableError, "Request not supported")

trace "Received GetLatestLightClientUpdate request", peer
let dag = peer.networkState.dag
peer.updateRequestQuota(lightClientUpdateResponseCost)
peer.awaitNonNegativeRequestQuota()
let update = dag.getLatestLightClientUpdate
if update.isSome:
await response.send(update.get)
else:
raise newException(ResourceUnavailableError,
"No LightClientUpdate available")

proc optimisticLightClientUpdate(
peer: Peer,
response: SingleChunkResponse[OptimisticLightClientUpdate])
{.async, libp2pProtocol("optimistic_light_client_update", 1).} =
if not peer.networkState.dag.createLightClientData:
raise newException(ResourceUnavailableError, "Request not supported")

trace "Received GetOptimisticLightClientUpdate request", peer
let dag = peer.networkState.dag
peer.updateRequestQuota(lightClientUpdateResponseCost)
peer.awaitNonNegativeRequestQuota()
let optimistic_update = dag.getOptimisticLightClientUpdate
if optimistic_update.isSome:
await response.send(optimistic_update.get)
else:
raise newException(ResourceUnavailableError,
"No OptimisticLightClientUpdate available")

proc goodbye(peer: Peer,
reason: uint64)
{.async, libp2pProtocol("goodbye", 1).} =
Expand Down
Loading

0 comments on commit a9a1f09

Please sign in to comment.