Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

harden and speed up block sync #3358

Merged
merged 2 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ proc getRequestProtoName(fn: NimNode): NimNode =

proc writeChunk*(conn: Connection,
responseCode: Option[ResponseCode],
payload: Bytes,
contextBytes: openarray[byte] = []): Future[void] =
payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
var output = memoryOutput()

try:
Expand Down Expand Up @@ -558,13 +558,14 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
finally:
await stream.close()

proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes): Future[void] =
proc sendResponseChunkBytes(
response: UntypedResponse, payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
inc response.writtenChunks
response.stream.writeChunk(some Success, payload)
response.stream.writeChunk(some Success, payload, contextBytes)

proc sendResponseChunk*(response: UntypedResponse, val: auto): Future[void] =
inc response.writtenChunks
response.stream.writeChunk(some Success, SSZ.encode(val))
sendResponseChunkBytes(response, SSZ.encode(val))

template sendUserHandlerResultAsChunkImpl*(stream: Connection,
handlerResultFut: Future): untyped =
Expand Down Expand Up @@ -617,6 +618,11 @@ template write*[M](r: MultipleChunksResponse[M], val: M): untyped =
mixin sendResponseChunk
sendResponseChunk(UntypedResponse(r), val)

template writeRawBytes*[M](
r: MultipleChunksResponse[M], bytes: openArray[byte],
contextBytes: openArray[byte]): untyped =
sendResponseChunkBytes(UntypedResponse(r), bytes, contextBytes)

template send*[M](r: SingleChunkResponse[M], val: M): untyped =
mixin sendResponseChunk
doAssert UntypedResponse(r).writtenChunks == 0
Expand Down
13 changes: 7 additions & 6 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ proc init*(T: type RequestManager, network: Eth2Node,
)

proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ForkedSignedBeaconBlock]): bool =
blocks: openArray[ref ForkedSignedBeaconBlock]): bool =
## This procedure checks peer's response.
var checks = @roots
if len(blocks) > len(roots):
return false
for blk in blocks:
let res = checks.find(blk.root)
let res = checks.find(blk[].root)
if res == -1:
return false
else:
Expand All @@ -75,10 +75,11 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
peer_score = peer.getScore()

let blocks = if peer.useSyncV2():
await peer.beaconBlocksByRoot_v2(BlockRootsList items)
await beaconBlocksByRoot_v2(peer, BlockRootsList items)
else:
(await peer.beaconBlocksByRoot(BlockRootsList items)).map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto:
blcks.mapIt(ForkedSignedBeaconBlock.init(it))
(await beaconBlocksByRoot(peer, BlockRootsList items)).map(
Copy link
Contributor

@zah zah Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still quite a lot of copying going on here. Would you like the network layer to allocate the blocks as refs directly? We can also bite the bullet and start using refs inside the branches of ForkedSignedBeaconBlock to avoid more copying in other places as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start using refs inside the branches of ForkedSignedBeaconBlock to avoid more copying in other places as well.

this goes a bit too far - it would infect the spec code with the weak guarantees of ref - already, the tests needed changing because of mutability difference between value types and reference types, and this is not a source of bugs I'd like to have to consider inside the security moat that chaindag is.

also, it's actually not that hard to do the code avoid these copies as well - but semantically, what I really am waiting for is move + lent support in the language - with that in place we can write semantically correct code and avoid most copies in a principled way. The profiler says that this copy is not really an issue, but should it turn into one, there's a simple way to avoid it as well, specially if we drop the v1 request (which should be safe to do).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, it's possible to use ref fields inside the "Forked" types and still have a similar external views over the data if we introduce some accessors in place of the currently directly exposed fields. The accessors will dereference the ref fields and will preserve the property that immutable Forked object can provide you only with immutable fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it's a good workaround to keep in mind should we find that we need it - at this point, even at backfill speeds we really don't - also because it's only needed in asynchronous layers, in general - in non-async code, we're already pretty good at avoiding copies - this is another reason to prefer explicit ref Xxx over ref object: we can be precise about where a reference is necessary (either for semantic or efficiency reasons).

The sync manager code in particular was vulnerable to this effect because it was shuffling seq:s of blocks around in several queues - it would in fact have been possible to use a ref seq[Forked... instead for the same efficiency benefit.

all that said, I'm not strongly opposed to the idea, but I find it an ugly hack / workaround that I would only introduce if forced, since better options (hopefully) will work in future nim:s.

This also reminds me of the importance of having let a = b.x be an alias, and not a copy - this is a critical change that needs to happen in nim for all my beautiful theory to become practical - that, and fixing all the unnecessary-temporary bugs, of which there are plenty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in writing this comment, I started liking the ref seq idea even more, fwiw ;)

proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))

if blocks.isOk:
let ublocks = blocks.get()
Expand All @@ -88,7 +89,7 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
gotUnviableBlock = false

for b in ublocks:
let ver = await rman.blockVerifier(b)
let ver = await rman.blockVerifier(b[])
if ver.isErr():
case ver.error()
of BlockError.MissingParent:
Expand Down
62 changes: 20 additions & 42 deletions beacon_chain/sync/sync_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type
slots*: uint64

SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]]
BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]]

proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slots: slots)
Expand Down Expand Up @@ -157,22 +157,15 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
slot = req.slot, slot_count = req.count, step = req.step,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
if peer.useSyncV2():
try:
let res =
try:
if peer.useSyncV2():
await beaconBlocksByRange_v2(peer, req.slot, req.count, req.step)
except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return
else:
(await beaconBlocksByRange(peer, req.slot, req.count, req.step)).map(
proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))

if res.isErr():
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
Expand All @@ -181,33 +174,18 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
error = $res.error()
return
return res
else:
let res =
try:
await beaconBlocksByRange(peer, req.slot, req.count, req.step)
except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return
if res.isErr():
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
step = req.step, peer_speed = peer.netKbps(),
direction = man.direction, error = $res.error(),
topics = "syncman"
return
let forked =
res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto:
blcks.mapIt(ForkedSignedBeaconBlock.init(it))
return forked
except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return

proc remainingSlots(man: SyncManager): uint64 =
if man.direction == SyncQueueKind.Forward:
Expand Down
Loading