Skip to content

Commit

Permalink
harden and speed up block sync
Browse files Browse the repository at this point in the history
The `GetBlockBy*` server implementation currently reads SSZ bytes from
database, deserializes them into a Nim object then serializes them right
back to SSZ - here, we eliminate the deser/ser steps and send the bytes
straight to the network. Unfortunately, the snappy recoding must still
be done because of differences in framing.

Also, the quota system makes one giant request for quota right before
sending all blocks - this means that a 1024 block request will be
"paused" for a long time, then all blocks will be sent at once causing a
spike in database reads which potentially will see the reading client
time out before any block is sent.

Finally, on the reading side we make several copies of blocks as they
travel through various queues - this was not noticeable before but
becomes a problem in two cases: bellatrix blocks are up to 10mb (instead
of .. 30-40kb) and when backfilling, we process a lot more of them a lot
faster.

* fix status comparisons for nodes syncing from genesis (#3327 was a bit
too hard)
* don't hit database at all for post-altair slots in GetBlock v1
requests
  • Loading branch information
arnetheduck committed Feb 4, 2022
1 parent 49282e9 commit e8e52f3
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 192 deletions.
18 changes: 12 additions & 6 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ proc getRequestProtoName(fn: NimNode): NimNode =

proc writeChunk*(conn: Connection,
responseCode: Option[ResponseCode],
payload: Bytes,
contextBytes: openarray[byte] = []): Future[void] =
payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
var output = memoryOutput()

try:
Expand Down Expand Up @@ -558,13 +558,14 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
finally:
await stream.close()

proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes): Future[void] =
proc sendResponseChunkBytes(
response: UntypedResponse, payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
inc response.writtenChunks
response.stream.writeChunk(some Success, payload)
response.stream.writeChunk(some Success, payload, contextBytes)

proc sendResponseChunk*(response: UntypedResponse, val: auto): Future[void] =
inc response.writtenChunks
response.stream.writeChunk(some Success, SSZ.encode(val))
sendResponseChunkBytes(response, SSZ.encode(val))

template sendUserHandlerResultAsChunkImpl*(stream: Connection,
handlerResultFut: Future): untyped =
Expand Down Expand Up @@ -617,6 +618,11 @@ template write*[M](r: MultipleChunksResponse[M], val: M): untyped =
mixin sendResponseChunk
sendResponseChunk(UntypedResponse(r), val)

template writeRawBytes*[M](
r: MultipleChunksResponse[M], bytes: openArray[byte],
contextBytes: openArray[byte]): untyped =
sendResponseChunkBytes(UntypedResponse(r), bytes, contextBytes)

template send*[M](r: SingleChunkResponse[M], val: M): untyped =
mixin sendResponseChunk
doAssert UntypedResponse(r).writtenChunks == 0
Expand Down
13 changes: 7 additions & 6 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ proc init*(T: type RequestManager, network: Eth2Node,
)

proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ForkedSignedBeaconBlock]): bool =
blocks: openArray[ref ForkedSignedBeaconBlock]): bool =
## This procedure checks peer's response.
var checks = @roots
if len(blocks) > len(roots):
return false
for blk in blocks:
let res = checks.find(blk.root)
let res = checks.find(blk[].root)
if res == -1:
return false
else:
Expand All @@ -75,10 +75,11 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
peer_score = peer.getScore()

let blocks = if peer.useSyncV2():
await peer.beaconBlocksByRoot_v2(BlockRootsList items)
await beaconBlocksByRoot_v2(peer, BlockRootsList items)
else:
(await peer.beaconBlocksByRoot(BlockRootsList items)).map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto:
blcks.mapIt(ForkedSignedBeaconBlock.init(it))
(await beaconBlocksByRoot(peer, BlockRootsList items)).map(
proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))

if blocks.isOk:
let ublocks = blocks.get()
Expand All @@ -88,7 +89,7 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
gotUnviableBlock = false

for b in ublocks:
let ver = await rman.blockVerifier(b)
let ver = await rman.blockVerifier(b[])
if ver.isErr():
case ver.error()
of BlockError.MissingParent:
Expand Down
62 changes: 20 additions & 42 deletions beacon_chain/sync/sync_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type
slots*: uint64

SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]]
BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]]

proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slots: slots)
Expand Down Expand Up @@ -157,22 +157,15 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
slot = req.slot, slot_count = req.count, step = req.step,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
if peer.useSyncV2():
try:
let res =
try:
if peer.useSyncV2():
await beaconBlocksByRange_v2(peer, req.slot, req.count, req.step)
except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return
else:
(await beaconBlocksByRange(peer, req.slot, req.count, req.step)).map(
proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))

if res.isErr():
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
Expand All @@ -181,33 +174,18 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
error = $res.error()
return
return res
else:
let res =
try:
await beaconBlocksByRange(peer, req.slot, req.count, req.step)
except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return
if res.isErr():
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
step = req.step, peer_speed = peer.netKbps(),
direction = man.direction, error = $res.error(),
topics = "syncman"
return
let forked =
res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto:
blcks.mapIt(ForkedSignedBeaconBlock.init(it))
return forked
except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return

proc remainingSlots(man: SyncManager): uint64 =
if man.direction == SyncQueueKind.Forward:
Expand Down
Loading

0 comments on commit e8e52f3

Please sign in to comment.