Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: crawler-friendly handshake #3982

Merged
merged 1 commit into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var DefaultConfig = Config{
ListenAddr: ":30303",
ListenAddr65: ":30304",
MaxPeers: 100,
MaxPendingPeers: 50,
MaxPendingPeers: 1000,
NAT: nat.Any(),
},
}
17 changes: 4 additions & 13 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,13 +786,12 @@ running:
// Ensure that the trusted flag is set before checking against MaxPeers.
c.flags |= trustedConn
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)
c.cont <- nil

case c := <-srv.checkpointAddPeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
err := srv.addPeerChecks(peers, inboundCount, c)
err := srv.postHandshakeChecks(peers, inboundCount, c)
if err == nil {
// The handshakes are done and it passed all checks.
p := srv.launchPeer(c, c.pubkey)
Expand Down Expand Up @@ -850,21 +849,13 @@ func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount in
return DiscAlreadyConnected
case c.node.ID() == srv.localnode.ID():
return DiscSelf
case (len(srv.Protocols) > 0) && (countMatchingProtocols(srv.Protocols, c.caps) == 0):
return DiscUselessPeer
default:
return nil
}
}

func (srv *Server) addPeerChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
// Drop connections with no matching protocols.
if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, c.caps) == 0 {
return DiscUselessPeer
}
// Repeat the post-handshake checks because the
// peer set might have changed since those checks were performed.
return srv.postHandshakeChecks(peers, inboundCount, c)
}

// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop(ctx context.Context) {
Expand Down
63 changes: 37 additions & 26 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,22 +274,30 @@ func TestServerAtCap(t *testing.T) {
}

// Inject a few connections to fill up the peer set.
for i := 0; i < 10; i++ {
for i := 0; i < srv.Config.MaxPeers; i++ {
c := newconn(randomID())
if err := srv.checkpoint(c, srv.checkpointAddPeer); err != nil {
t.Fatalf("could not add conn %d: %v", i, err)
}
}

// Try inserting a non-trusted connection.
anotherID := randomID()
c := newconn(anotherID)
if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != DiscTooManyPeers {
if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != nil {
t.Error("unexpected error @ checkpointPostHandshake:", err)
}
if err := srv.checkpoint(c, srv.checkpointAddPeer); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err)
}

// Try inserting a trusted connection.
c = newconn(trustedID)
if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != nil {
t.Error("unexpected error for trusted conn @posthandshake:", err)
t.Error("unexpected error @ checkpointPostHandshake:", err)
}
if err := srv.checkpoint(c, srv.checkpointAddPeer); err != nil {
t.Error("unexpected error for trusted conn:", err)
}
if !c.is(trustedConn) {
t.Error("Server did not set trusted flag")
Expand All @@ -298,15 +306,21 @@ func TestServerAtCap(t *testing.T) {
// Remove from trusted set and try again
srv.RemoveTrustedPeer(newNode(trustedID, ""))
c = newconn(trustedID)
if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != DiscTooManyPeers {
if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != nil {
t.Error("unexpected error @ checkpointPostHandshake:", err)
}
if err := srv.checkpoint(c, srv.checkpointAddPeer); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err)
}

// Add anotherID to trusted set and try again
srv.AddTrustedPeer(newNode(anotherID, ""))
c = newconn(anotherID)
if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != nil {
t.Error("unexpected error for trusted conn @posthandshake:", err)
t.Error("unexpected error @ checkpointPostHandshake:", err)
}
if err := srv.checkpoint(c, srv.checkpointAddPeer); err != nil {
t.Error("unexpected error for trusted conn:", err)
}
if !c.is(trustedConn) {
t.Error("Server did not set trusted flag")
Expand All @@ -322,8 +336,7 @@ func TestServerPeerLimits(t *testing.T) {
pubkey: &clientkey.PublicKey,
phs: protoHandshake{
Pubkey: crypto.MarshalPubkey(&clientkey.PublicKey),
// Force "DiscUselessPeer" due to unmatching caps
// Caps: []Cap{discard.cap()},
Caps: []Cap{discard.cap()},
},
}

Expand All @@ -348,35 +361,31 @@ func TestServerPeerLimits(t *testing.T) {
flags := dynDialedConn
dialDest := clientnode
conn, _ := net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr != DiscTooManyPeers {
t.Errorf("unexpected close error: %q", tp.closeErr)
err := srv.SetupConn(conn, flags, dialDest)
_ = conn.Close()
if !errors.Is(err, DiscTooManyPeers) {
t.Fatalf("expected DiscTooManyPeers, but got error: %q", err)
}
conn.Close()

srv.AddTrustedPeer(clientnode)

// Check that server allows a trusted peer despite being full.
conn, _ = net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr == DiscTooManyPeers {
t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
err = srv.SetupConn(conn, flags, dialDest)
_ = conn.Close()
if err != nil {
t.Fatalf("failed to bypass MaxPeers with trusted node: %q", err)
}

if tp.closeErr != DiscUselessPeer {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()

srv.RemoveTrustedPeer(clientnode)

// Check that server is full again.
conn, _ = net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr != DiscTooManyPeers {
t.Errorf("unexpected close error: %q", tp.closeErr)
err = srv.SetupConn(conn, flags, dialDest)
_ = conn.Close()
if !errors.Is(err, DiscTooManyPeers) {
t.Fatalf("expected DiscTooManyPeers, but got error: %q", err)
}
conn.Close()
}

func TestServerSetupConn(t *testing.T) {
Expand Down Expand Up @@ -423,7 +432,7 @@ func TestServerSetupConn(t *testing.T) {
{
tt: &setupTransport{pubkey: srvpub, phs: protoHandshake{Pubkey: crypto.MarshalPubkey(srvpub)}},
flags: inboundConn,
wantCalls: "doEncHandshake,close,",
wantCalls: "doEncHandshake,doProtoHandshake,close,",
wantCloseErr: DiscSelf,
},
{
Expand Down Expand Up @@ -490,17 +499,19 @@ func (c *setupTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake,
}
return &c.phs, nil
}

func (c *setupTransport) close(err error) {
c.calls += "close,"
c.closeErr = err
}

// setupConn shouldn't write to/read from the connection.
func (c *setupTransport) WriteMsg(Msg) error {
panic("WriteMsg called on setupTransport")
return errors.New("WriteMsg called on setupTransport")
}

func (c *setupTransport) ReadMsg() (Msg, error) {
panic("ReadMsg called on setupTransport")
return Msg{}, errors.New("ReadMsg called on setupTransport")
}

func newkey() *ecdsa.PrivateKey {
Expand Down