Skip to content

Commit

Permalink
Decode devp2p packets off the event thread (PegaSysEng#1439)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored and notlesh committed May 14, 2019
1 parent bdbabc9 commit a7a364e
Showing 1 changed file with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.NetworkUtility;
import tech.pegasys.pantheon.util.Preconditions;

import java.io.IOException;
import java.net.BindException;
Expand Down Expand Up @@ -197,26 +196,34 @@ private void handleException(final Throwable exception) {
* @param datagram the received datagram.
*/
private void handlePacket(final DatagramPacket datagram) {
try {
final int length = datagram.data().length();
Preconditions.checkGuard(
validatePacketSize(length),
PeerDiscoveryPacketDecodingException::new,
"Packet too large. Actual size (bytes): %s",
length);

// We allow exceptions to bubble up, as they'll be picked up by the exception handler.
final Packet packet = Packet.decode(datagram.data());
// Acquire the senders coordinates to build a Peer representation from them.
final String host = datagram.sender().host();
final int port = datagram.sender().port();
final Endpoint endpoint = new Endpoint(host, port, OptionalInt.empty());
handleIncomingPacket(endpoint, packet);
} catch (final PeerDiscoveryPacketDecodingException e) {
LOG.debug("Discarding invalid peer discovery packet: {}", e.getMessage());
} catch (final Throwable t) {
LOG.error("Encountered error while handling packet", t);
final int length = datagram.data().length();
if (!validatePacketSize(length)) {
LOG.debug("Discarding over-sized packet. Actual size (bytes): " + length);
return;
}
vertx.<Packet>executeBlocking(
future -> {
try {
future.complete(Packet.decode(datagram.data()));
} catch (final Throwable t) {
future.fail(t);
}
},
event -> {
if (event.succeeded()) {
// Acquire the senders coordinates to build a Peer representation from them.
final String host = datagram.sender().host();
final int port = datagram.sender().port();
final Endpoint endpoint = new Endpoint(host, port, OptionalInt.empty());
handleIncomingPacket(endpoint, event.result());
} else {
if (event.cause() instanceof PeerDiscoveryPacketDecodingException) {
LOG.debug("Discarding invalid peer discovery packet: {}", event.cause().getMessage());
} else {
LOG.error("Encountered error while handling packet", event.cause());
}
}
});
}

private class VertxAsyncExecutor implements AsyncExecutor {
Expand Down

0 comments on commit a7a364e

Please sign in to comment.