Skip to content

Commit

Permalink
segregate sync committee messages by period / fork (#4953)
Browse files Browse the repository at this point in the history
`SyncCommitteeMsgPool` grouped messages by their `beacon_block_root`.
This is problematic around sync committee period boundaries and forks.
Around sync committee period boundaries, members from both the current
and next sync committee may sign the same `beacon_block_root`; mixing
the signatures from both committees together is a mistake. Likewise,
around fork transitions, the `signing_root` changes, so those messages
also need to be segregated.
  • Loading branch information
etan-status authored May 17, 2023
1 parent b3c3b9a commit 40e8993
Show file tree
Hide file tree
Showing 14 changed files with 452 additions and 231 deletions.
7 changes: 5 additions & 2 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,11 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
+ An empty pool is safe to prune OK
+ An empty pool is safe to prune 2 OK
+ An empty pool is safe to use OK
+ Missed slots across fork transition OK
+ Missed slots across sync committee period boundary OK
+ isSeen OK
```
OK: 4/4 Fail: 0/4 Skip: 0/4
OK: 7/7 Fail: 0/7 Skip: 0/7
## SyncManager test suite
```diff
+ Process all unviable blocks OK
Expand Down Expand Up @@ -677,4 +680,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 9/9 Fail: 0/9 Skip: 0/9

---TOTAL---
OK: 386/391 Fail: 0/391 Skip: 5/391
OK: 389/394 Fail: 0/394 Skip: 5/394
198 changes: 124 additions & 74 deletions beacon_chain/consensus_object_pools/sync_committee_msg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
{.push raises: [].}

import
std/[sets, tables],
std/[algorithm, sequtils, sets, tables],
stew/shims/hashes,
eth/p2p/discoveryv5/random2,
chronicles,
../spec/[crypto, digest],
../spec/[crypto, digest, forks],
../spec/datatypes/altair

export hashes, sets, tables, altair
Expand All @@ -24,12 +24,11 @@ const

type
SyncCommitteeMsgKey = object
originator: uint64 # ValidatorIndex avoiding mess with invalid values
originator: uint64 # ValidatorIndex to avoid invalid values
slot: Slot
subcommitteeIdx: uint64 # SyncSubcommitteeIndex avoiding mess with invalid values
subcommitteeIdx: uint64 # SyncSubcommitteeIndex to avoid invalid values

TrustedSyncCommitteeMsg* = object
slot*: Slot
subcommitteeIdx*: SyncSubcommitteeIndex
positionInCommittee*: uint64
signature*: CookedSig
Expand All @@ -40,93 +39,132 @@ type
signature*: CookedSig

BestSyncSubcommitteeContributions* = object
slot*: Slot
subnets*: array[SYNC_COMMITTEE_SUBNET_COUNT,
BestSyncSubcommitteeContribution]

OnSyncContributionCallback* =
proc(data: SignedContributionAndProof) {.gcsafe, raises: [Defect].}

# Messages from different slots / forks may sign the same beacon block root.
# Messages across slots are compatible, but not across forks (signing root).
# Messages from different periods have different signers, so are incompatible.
# Note that the sync committee is determined by `message.slot + 1`, the fork
# is determined by `message.slot`, and both can be different from `bid.slot`.
SyncMsgTarget = object
bid: BlockId # Based on message `beacon_block_root`
period: SyncCommitteePeriod # Based on message `slot + 1`
fork: ConsensusFork # Based on message `slot`

SyncCommitteeMsgPool* = object
seenSyncMsgByAuthor*: HashSet[SyncCommitteeMsgKey]
seenSyncMsgByAuthor*: Table[SyncCommitteeMsgKey, Eth2Digest]
seenContributionByAuthor*: HashSet[SyncCommitteeMsgKey]
syncMessages*: Table[Eth2Digest, seq[TrustedSyncCommitteeMsg]]
bestContributions*: Table[Eth2Digest, BestSyncSubcommitteeContributions]
syncMessages*: Table[SyncMsgTarget, seq[TrustedSyncCommitteeMsg]]
bestContributions*: Table[SyncMsgTarget, BestSyncSubcommitteeContributions]
onContributionReceived*: OnSyncContributionCallback

rng: ref HmacDrbgContext
cfg: RuntimeConfig

func hash*(x: SyncCommitteeMsgKey): Hash =
hashAllFields(x)

func toSyncMsgTarget(
cfg: RuntimeConfig, bid: BlockId, slot: Slot): SyncMsgTarget =
SyncMsgTarget(
bid: bid,
period: (slot + 1).sync_committee_period,
fork: cfg.consensusForkAtEpoch(slot.epoch))

func hash(x: SyncMsgTarget): Hash =
hashAllFields(x)

func `<`(x, y: SyncMsgTarget): bool =
if x.bid.slot != y.bid.slot:
x.bid.slot < y.bid.slot
elif x.period != y.period:
x.period < y.period
else:
x.fork < y.fork

func init*(T: type SyncCommitteeMsgPool,
rng: ref HmacDrbgContext,
cfg: RuntimeConfig,
onSyncContribution: OnSyncContributionCallback = nil
): SyncCommitteeMsgPool =
T(rng: rng, onContributionReceived: onSyncContribution)
T(rng: rng, cfg: cfg, onContributionReceived: onSyncContribution)

func pruneData*(pool: var SyncCommitteeMsgPool, slot: Slot) =
func pruneData*(pool: var SyncCommitteeMsgPool, slot: Slot, force = false) =
## This should be called at the end of slot.
clear pool.seenContributionByAuthor
clear pool.seenSyncMsgByAuthor

if slot < syncCommitteeMsgsRetentionSlots:
return

let minSlotToRetain = slot - syncCommitteeMsgsRetentionSlots
var syncMsgsToDelete: seq[Eth2Digest]
var contributionsToDelete: seq[Eth2Digest]

for blockRoot, msgs in pool.syncMessages:
if msgs[0].slot < minSlotToRetain:
syncMsgsToDelete.add blockRoot

for blockRoot in syncMsgsToDelete:
pool.syncMessages.del blockRoot
# Messages signing a `beacon_block_root` may remain valid over multiple slots.
# Therefore, we filter by the targeted `BlockId` instead of message `slot`.
let
minSlotToRetain = slot - syncCommitteeMsgsRetentionSlots
minEntriesToKeep = if force: 0 else: syncCommitteeMsgsRetentionSlots

for blockRoot, bestContributions in pool.bestContributions:
if bestContributions.slot < minSlotToRetain:
contributionsToDelete.add blockRoot
template pruneTable(table: untyped) =
if table.len > minEntriesToKeep:
var targets = table.keys().toSeq()
targets.sort(order = SortOrder.Descending)
for i in minEntriesToKeep ..< targets.len:
if targets[i].bid.slot < minSlotToRetain:
table.del targets[i]

for blockRoot in contributionsToDelete:
pool.bestContributions.del blockRoot
pruneTable pool.syncMessages
pruneTable pool.bestContributions

func isSeen*(
pool: SyncCommitteeMsgPool,
msg: SyncCommitteeMessage,
subcommitteeIdx: SyncSubcommitteeIndex): bool =
subcommitteeIdx: SyncSubcommitteeIndex,
headBid: BlockId): bool =
let seenKey = SyncCommitteeMsgKey(
originator: msg.validator_index, # Might be unvalidated at this point
originator: msg.validator_index, # Might be unvalidated at this point
slot: msg.slot,
subcommitteeIdx: subcommitteeIdx.uint64)
seenKey in pool.seenSyncMsgByAuthor
return
if seenKey notin pool.seenSyncMsgByAuthor:
false
elif msg.beacon_block_root == headBid.root:
pool.seenSyncMsgByAuthor.getOrDefault(seenKey) == headBid.root
else:
true

proc addSyncCommitteeMessage*(
pool: var SyncCommitteeMsgPool,
slot: Slot,
blockRoot: Eth2Digest,
bid: BlockId,
validatorIndex: uint64,
signature: CookedSig,
subcommitteeIdx: SyncSubcommitteeIndex,
positionsInCommittee: openArray[uint64]) =

let
seenKey = SyncCommitteeMsgKey(
originator: validatorIndex,
slot: slot,
subcommitteeIdx: subcommitteeIdx.uint64)

pool.seenSyncMsgByAuthor.incl seenKey

for position in positionsInCommittee:
pool.syncMessages.mgetOrPut(blockRoot, @[]).add TrustedSyncCommitteeMsg(
slot: slot,
subcommitteeIdx: subcommitteeIdx,
positionInCommittee: position,
signature: signature)
positionsInCommittee: seq[uint64]) =
let seenKey = SyncCommitteeMsgKey(
originator: validatorIndex,
slot: slot,
subcommitteeIdx: subcommitteeIdx.uint64)
pool.seenSyncMsgByAuthor[seenKey] = bid.root

func registerVotes(votes: var seq[TrustedSyncCommitteeMsg]) =
for position in positionsInCommittee:
block addVote:
for vote in votes:
if vote.subcommitteeIdx == subcommitteeIdx and
vote.positionInCommittee == position:
break addVote
votes.add TrustedSyncCommitteeMsg(
subcommitteeIdx: subcommitteeIdx,
positionInCommittee: position,
signature: signature)
let target = pool.cfg.toSyncMsgTarget(bid, slot)
pool.syncMessages.mgetOrPut(target, @[]).registerVotes()

debug "Sync committee message resolved",
slot = slot, blockRoot = shortLog(blockRoot), validatorIndex
slot = slot, blockRoot = shortLog(target.bid.root), validatorIndex

func computeAggregateSig(votes: seq[TrustedSyncCommitteeMsg],
subcommitteeIdx: SyncSubcommitteeIndex,
Expand All @@ -135,6 +173,7 @@ func computeAggregateSig(votes: seq[TrustedSyncCommitteeMsg],
aggregateSig {.noinit.}: AggregateSignature
initialized = false

contribution.aggregation_bits.reset()
for vote in votes:
if vote.subcommitteeIdx != subcommitteeIdx:
continue
Expand All @@ -150,21 +189,24 @@ func computeAggregateSig(votes: seq[TrustedSyncCommitteeMsg],

if initialized:
contribution.signature = aggregateSig.finish.toValidatorSig
else:
contribution.signature = ValidatorSig.infinity

initialized

func produceContribution*(
pool: SyncCommitteeMsgPool,
slot: Slot,
headRoot: Eth2Digest,
headBid: BlockId,
subcommitteeIdx: SyncSubcommitteeIndex,
outContribution: var SyncCommitteeContribution): bool =
if headRoot in pool.syncMessages:
let target = pool.cfg.toSyncMsgTarget(headBid, slot)
if target in pool.syncMessages:
outContribution.slot = slot
outContribution.beacon_block_root = headRoot
outContribution.beacon_block_root = headBid.root
outContribution.subcommittee_index = subcommitteeIdx.asUInt64
try:
computeAggregateSig(pool.syncMessages[headRoot],
computeAggregateSig(pool.syncMessages[target],
subcommitteeIdx,
outContribution)
except KeyError:
Expand Down Expand Up @@ -203,91 +245,99 @@ func covers(

func covers*(
pool: var SyncCommitteeMsgPool,
contribution: SyncCommitteeContribution): bool =
contribution: SyncCommitteeContribution,
bid: BlockId): bool =
## Return true iff the given contribution brings no new information compared
## to the contributions already seen in the pool, ie if the contriubution is a
## subset of the best contribution so far
pool.bestContributions.withValue(contribution.beacon_block_root, best):
let target = pool.cfg.toSyncMsgTarget(bid, contribution.slot)
pool.bestContributions.withValue(target, best):
return best[].covers(contribution)

return false

proc addContribution(pool: var SyncCommitteeMsgPool,
aggregator_index: uint64,
contribution: SyncCommitteeContribution,
bid: BlockId,
signature: CookedSig) =
let seenKey = SyncCommitteeMsgKey(
originator: aggregator_index,
slot: contribution.slot,
subcommitteeIdx: contribution.subcommittee_index)
pool.seenContributionByAuthor.incl seenKey

template blockRoot: auto = contribution.beacon_block_root

if blockRoot notin pool.bestContributions:
let target = pool.cfg.toSyncMsgTarget(bid, contribution.slot)
if target notin pool.bestContributions:
let totalParticipants = countOnes(contribution.aggregation_bits)
var initialBestContributions = BestSyncSubcommitteeContributions(
slot: contribution.slot)
var initialBestContributions = BestSyncSubcommitteeContributions()

initialBestContributions.subnets[contribution.subcommittee_index] =
BestSyncSubcommitteeContribution(
totalParticipants: totalParticipants,
participationBits: contribution.aggregation_bits,
signature: signature)

pool.bestContributions[blockRoot] = initialBestContributions
pool.bestContributions[target] = initialBestContributions
else:
try:
addAggregateAux(pool.bestContributions[blockRoot], contribution)
addAggregateAux(pool.bestContributions[target], contribution)
except KeyError:
raiseAssert "We have checked for the key upfront"

proc addContribution*(pool: var SyncCommitteeMsgPool,
scproof: SignedContributionAndProof,
bid: BlockId,
signature: CookedSig) =
pool.addContribution(
scproof.message.aggregator_index, scproof.message.contribution, signature)
scproof.message.aggregator_index,
scproof.message.contribution,
bid, signature)

if not(isNil(pool.onContributionReceived)):
pool.onContributionReceived(scproof)

proc produceSyncAggregateAux(
bestContributions: BestSyncSubcommitteeContributions): SyncAggregate =
contributions: BestSyncSubcommitteeContributions): SyncAggregate =
var
aggregateSig {.noinit.}: AggregateSignature
initialized = false
startTime = Moment.now

aggregate: SyncAggregate
for subcommitteeIdx in SyncSubcommitteeIndex:
if bestContributions.subnets[subcommitteeIdx].totalParticipants == 0:
if contributions.subnets[subcommitteeIdx].totalParticipants == 0:
continue

for pos, value in bestContributions.subnets[subcommitteeIdx].participationBits:
for pos, value in contributions.subnets[subcommitteeIdx].participationBits:
if value:
let globalPos = subcommitteeIdx.asInt * SYNC_SUBCOMMITTEE_SIZE + pos
result.sync_committee_bits.setBit globalPos
aggregate.sync_committee_bits.setBit globalPos

if not initialized:
initialized = true
aggregateSig.init(bestContributions.subnets[subcommitteeIdx].signature)
aggregateSig.init(contributions.subnets[subcommitteeIdx].signature)
else:
aggregateSig.aggregate(bestContributions.subnets[subcommitteeIdx].signature)
aggregateSig.aggregate(contributions.subnets[subcommitteeIdx].signature)

if initialized:
result.sync_committee_signature = aggregateSig.finish.toValidatorSig
aggregate.sync_committee_signature = aggregateSig.finish.toValidatorSig
else:
result.sync_committee_signature = ValidatorSig.infinity
aggregate.sync_committee_signature = ValidatorSig.infinity

let duration = Moment.now - startTime
debug "SyncAggregate produced", duration,
bits = result.sync_committee_bits
bits = aggregate.sync_committee_bits

aggregate

proc produceSyncAggregate*(
pool: SyncCommitteeMsgPool,
targetRoot: Eth2Digest): SyncAggregate =
if targetRoot in pool.bestContributions:
bid: BlockId,
slot: Slot): SyncAggregate =
let target = pool.cfg.toSyncMsgTarget(bid, slot)
if target in pool.bestContributions:
try:
produceSyncAggregateAux(pool.bestContributions[targetRoot])
produceSyncAggregateAux(pool.bestContributions[target])
except KeyError:
raiseAssert "We have checked for the key upfront"
else:
Expand Down
Loading

0 comments on commit 40e8993

Please sign in to comment.