Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduction of address ratelimit #1230

Merged
merged 5 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions qa/pull-tester/rpc-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
'rest.py',
'httpbasics.py',
'reindex.py',
'p2p-addr.py',
'multi_rpc.py',
'zapwallettxes.py',
'proxy_test.py',
Expand Down
196 changes: 196 additions & 0 deletions qa/rpc-tests/p2p-addr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#!/usr/bin/env python3
# Copyright (c) 2022 The Dogecoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#

from test_framework.mininode import * #NodeConnCB, NODE_NETWORK, NetworkThread, NodeConn, wait_until, CAddress, msg_addr, msg_ping, msg_pong
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import *
import time

'''
AddrTest -- test processing of addr messages
'''

class AddrTestNode(SingleNodeConnCB):
def __init__(self, name):
SingleNodeConnCB.__init__(self)
self.ports_received = []

def add_connection(self, conn):
self.connection = conn

def getaddr(self):
self.connection.send_message(msg_getaddr())

def on_addr(self, conn, message):
for addr in message.addrs:
self.ports_received.append(addr.port)

def on_getaddr(self, conn, message):
self.getaddr_msg_received += 1

def wait_for_disconnect(self):
if self.connection == None:
return True
def is_closed():
return self.connection.state == "closed"
return wait_until(is_closed, timeout=30)

def disconnect(self):
self.connection.disconnect_node()
return self.wait_for_disconnect()

class AddrTest(BitcoinTestFramework):
def __init__(self):
super().__init__()
self.setup_clean_chain = True
self.num_nodes = 1
self.counter = 0
self.mocktime = int(time.time())
self.start_port = 10000

def run_test(self):
self.nodes[0].generate(1)

self.rate_limiting_test() # run this first so that we can test the
# initial state of 1 token
self.simple_relay_test()
self.oversized_addr_test()

self.send_node.disconnect()
for recv_node in self.recv_nodes:
recv_node.disconnect()

def setup_network(self):
self.nodes = []
self.nodes.append(start_node(0, self.options.tmpdir, ["-debug=net", "-peertimeout=999999999"]))

self.send_node = self.create_testnode()
self.recv_nodes = []
for i in range(4):
self.recv_nodes.append(self.create_testnode())

NetworkThread().start()
self.send_node.wait_for_verack()
for recv_node in self.recv_nodes:
recv_node.wait_for_verack()

def create_testnode(self, send_getaddr=False, node_idx=0):
node = AddrTestNode(send_getaddr)
conn = NodeConn('127.0.0.1', p2p_port(node_idx), self.nodes[node_idx], node)
node.add_connection(conn)
return node

def index_to_port(self, idx):
return self.start_port + idx

def last_port_sent(self):
assert self.counter > 0
return self.index_to_port(self.counter - 1)

def have_received_port(self, port):
for peer in self.recv_nodes:
if port in peer.ports_received:
return True
return False

def wait_for_specific_port(self, port):
def port_received():
return self.have_received_port(port)
return wait_until(port_received, timeout=600)

def create_addr_msg(self, num, services):
addrs = []
for i in range(num):
addr = CAddress()
addr.time = self.mocktime + random.randrange(-100, 100)
addr.nServices = services
assert self.counter < 256 ** 2 # Don't allow the returned ip addresses to wrap.
addr.ip = f"123.123.{self.counter // 256}.{self.counter % 256}"
addr.port = self.index_to_port(self.counter)
self.counter += 1
addrs.append(addr)

msg = msg_addr()
msg.addrs = addrs
return msg

def send_addr_msg(self, msg):
self.send_node.connection.send_message(msg)
time.sleep(0.5) # sleep half a second to prevent mocktime racing the msg

# invoke m_next_addr_send timer:
# `addr` messages are sent on an exponential distribution with mean interval of 30s.
# Setting the mocktime 600s forward gives a probability of (1 - e^-(600/30)) that
# the event will occur (i.e. this fails once in ~500 million repeats).
self.mocktime += 60 * 10
self.nodes[0].setmocktime(self.mocktime)

time.sleep(0.5) # sleep half a second to prevent pings racing mocktime
for peer in self.recv_nodes:
peer.sync_with_ping()

def create_and_send_addr_msg(self, num, services=NODE_NETWORK):
self.send_addr_msg(self.create_addr_msg(num, services))

def simple_relay_test(self):
# send a message with 2 addresses
self.create_and_send_addr_msg(2)

# make sure we received the last addr record
assert self.wait_for_specific_port(self.last_port_sent())

def oversized_addr_test(self):
# create message with 1010 entries and
# confirm that the node discarded the entries

# to make sure we are not rate-limited, add 1001 / 0.1 seconds
# to mocktime to allocate the maximum non-burst amount of tokens
self.mocktime += 10010
self.nodes[0].setmocktime(self.mocktime)

# send one valid message, keep track of the port it contains
valid_port_before = self.index_to_port(self.counter)
self.create_and_send_addr_msg(1)

# send a too large message that will be ignored
self.create_and_send_addr_msg(1010)

# finish with a valid message, keep track of the port it contains
valid_port_after = self.index_to_port(self.counter)
self.create_and_send_addr_msg(1)

# wait until both valid addresses were propagated
assert self.wait_for_specific_port(valid_port_before)
assert self.wait_for_specific_port(valid_port_after)

# make sure that all addresses from the invalid message were discarded
# by making sure that none of them were propagated
for port in range(valid_port_before+1, valid_port_after):
assert not self.have_received_port(port)

def rate_limiting_test(self):
# send 1 addr on connect
self.create_and_send_addr_msg(1)

# because we set mocktime after sending the message now have
# 600 * 0.1 = 60 tokens, minus the one we just sent.

# send 69 tokens
first_port = self.index_to_port(self.counter)
self.create_and_send_addr_msg(69)

# check that we have a peer with 60 processed addrs
# and 10 rate limited addrs
peerinfo = self.nodes[0].getpeerinfo()
sendingPeer = None
for info in peerinfo:
if info["addr_processed"] == 60:
sendingPeer = info
assert not sendingPeer is None
assert sendingPeer["addr_rate_limited"] == 10

if __name__ == '__main__':
AddrTest().main()
5 changes: 3 additions & 2 deletions qa/rpc-tests/test_framework/mininode.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,10 @@ def deserialize(self, f, *, with_time = True):
self.ip = socket.inet_ntoa(f.read(4))
self.port = struct.unpack(">H", f.read(2))[0]

def serialize(self):
def serialize(self, with_time=True):
r = b""
r += struct.pack("<I", 0)
if with_time:
r += struct.pack("<I", self.time)
r += struct.pack("<Q", self.nServices)
r += self.pchReserved
r += socket.inet_aton(self.ip)
Expand Down
7 changes: 7 additions & 0 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,9 @@ void CNode::copyStats(CNodeStats &stats)
X(nRecvBytes);
}
X(fWhitelisted);
X(minFeeFilter);
X(nProcessedAddrs);
X(nRatelimitedAddrs);

// It is common for nodes with good ping times to suddenly become lagged,
// due to a new block arriving or other large transfer.
Expand Down Expand Up @@ -3409,6 +3412,10 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
fGetAddr = false;
nNextLocalAddrSend = 0;
nNextAddrSend = 0;
nAddrTokenBucket = 1; // initialize to 1 to allow self-announcement
nAddrTokenTimestamp = GetTimeMicros();
nProcessedAddrs = 0;
nRatelimitedAddrs = 0;
nNextInvSend = 0;
fRelayTxes = false;
fSentAddr = false;
Expand Down
11 changes: 11 additions & 0 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ class CNodeStats
CAddress addr;
// In case this is a verified MN, this value is the proTx of the MN
uint256 verifiedProRegTxHash;
CAmount minFeeFilter;
uint64_t nProcessedAddrs;
uint64_t nRatelimitedAddrs;
};


Expand Down Expand Up @@ -792,6 +795,14 @@ class CNode
int64_t nNextAddrSend;
int64_t nNextLocalAddrSend;

/** Number of addresses that can be processed from this peer. */
double nAddrTokenBucket;
/** When nAddrTokenBucket was last updated, in microseconds */
int64_t nAddrTokenTimestamp;

std::atomic<uint64_t> nProcessedAddrs;
std::atomic<uint64_t> nRatelimitedAddrs;

// inventory based relay
CRollingBloomFilter filterInventoryKnown;
// Set of Dandelion transactions that should be known to this peer
Expand Down
43 changes: 40 additions & 3 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
{
connman.PushMessage(pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR));
pfrom->fGetAddr = true;

// When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response
// (bypassing the MAX_ADDR_PROCESSING_TOKEN_BUCKET limit).
pfrom->nAddrTokenBucket += MAX_ADDR_TO_SEND;
}
connman.MarkAddressGood(pfrom->addr);
}
Expand Down Expand Up @@ -1657,11 +1661,40 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
std::vector<CAddress> vAddrOk;
int64_t nNow = GetAdjustedTime();
int64_t nSince = nNow - 10 * 60;

// track rate limiting within this message
uint64_t nProcessedAddrs = 0;
uint64_t nRatelimitedAddrs = 0;

// Update/increment addr rate limiting bucket.
const uint64_t nCurrentTime = GetMockableTimeMicros();
if (pfrom->nAddrTokenBucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
const uint64_t nTimeElapsed = std::max(nCurrentTime - pfrom->nAddrTokenTimestamp, uint64_t(0));
const double nIncrement = nTimeElapsed * MAX_ADDR_RATE_PER_SECOND / 1e6;
pfrom->nAddrTokenBucket = std::min<double>(pfrom->nAddrTokenBucket + nIncrement, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
pfrom->nAddrTokenTimestamp = nCurrentTime;

// Randomize entries before processing, to prevent an attacker to
// determine which entries will make it through the rate limit
std::shuffle(vAddr.begin(), vAddr.end(), FastRandomContext());

BOOST_FOREACH(CAddress& addr, vAddr)
{
if (interruptMsgProc)
return true;

// apply rate limiting
if (!pfrom->fWhitelisted) {
if (pfrom->nAddrTokenBucket < 1.0) {
nRatelimitedAddrs++;
continue;
}
pfrom->nAddrTokenBucket -= 1.0;
}

nProcessedAddrs++;

if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES)
continue;

Expand All @@ -1678,6 +1711,13 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
if (fReachable)
vAddrOk.push_back(addr);
}

pfrom->nProcessedAddrs += nProcessedAddrs;
pfrom->nRatelimitedAddrs += nRatelimitedAddrs;

LogPrint("net", "Received addr: %u addresses (%u processed, %u rate-limited) peer=%d\n",
vAddr.size(), nProcessedAddrs, nRatelimitedAddrs, pfrom->GetId());

connman.AddNewAddresses(vAddrOk, pfrom->addr, 2 * 60 * 60);
if (vAddr.size() < 1000)
pfrom->fGetAddr = false;
Expand Down Expand Up @@ -1809,9 +1849,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
pfrom->AskFor(inv, doubleRequestDelay);
}
}

// Track requests for our stuff
GetMainSignals().Inventory(inv.hash);
}

if (!vToFetch.empty())
Expand Down
9 changes: 9 additions & 0 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ static const int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60;
/** Default number of orphan+recently-replaced txn to keep around for block reconstruction */
static const unsigned int DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN = 100;

/** The maximum rate of address records we're willing to process on average.
* Is bypassed for whitelisted connections. */
static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};

/** The soft limit of the address processing token bucket (the regular MAX_ADDR_RATE_PER_SECOND
* based increments won't go above this, but the MAX_ADDR_TO_SEND increment following GETADDR
* is exempt from this limit. */
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};

/** Register with a network node to receive its signals */
void RegisterNodeSignals(CNodeSignals& nodeSignals);
/** Unregister a network node */
Expand Down
8 changes: 6 additions & 2 deletions src/rpc/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ UniValue getpeerinfo(const JSONRPCRequest& request)
" n, (numeric) The heights of blocks we're currently asking from this peer\n"
" ...\n"
" ],\n"
" \"whitelisted\": true|false, (boolean) Whether the peer is whitelisted\n"
" \"addr_processed\": n, (numeric) The total number of addresses processed, excluding those dropped due to rate limiting\n"
" \"addr_rate_limited\": n, (numeric) The total number of addresses dropped due to rate limiting\n"
" \"whitelisted\": true|false, (boolean) Whether the peer is whitelisted\n"
" \"bytessent_per_msg\": {\n"
" \"addr\": n, (numeric) The total bytes sent aggregated by message type\n"
" ...\n"
Expand Down Expand Up @@ -171,7 +173,9 @@ UniValue getpeerinfo(const JSONRPCRequest& request)
}
obj.push_back(Pair("inflight", heights));
}
obj.push_back(Pair("whitelisted", stats.fWhitelisted));
obj.pushKV("addr_processed", stats.nProcessedAddrs);
obj.pushKV("addr_rate_limited", stats.nRatelimitedAddrs);
obj.pushKV("whitelisted", stats.fWhitelisted);

UniValue sendPerMsgCmd(UniValue::VOBJ);
BOOST_FOREACH(const mapMsgCmdSize::value_type &i, stats.mapSendBytesPerMsgCmd) {
Expand Down
6 changes: 6 additions & 0 deletions src/utiltime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ int64_t GetTime()
return now;
}

int64_t GetMockableTimeMicros()
{
if (nMockTime) return nMockTime * 1000000;
return GetTimeMicros();
}

void SetMockTime(int64_t nMockTimeIn)
{
nMockTime = nMockTimeIn;
Expand Down
1 change: 1 addition & 0 deletions src/utiltime.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ int64_t GetTimeMillis();
int64_t GetTimeMicros();
int64_t GetSystemTimeInSeconds(); // Like GetTime(), but not mockable
int64_t GetLogTimeMicros();
int64_t GetMockableTimeMicros();
void SetMockTime(int64_t nMockTimeIn);
void MilliSleep(int64_t n);

Expand Down