Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0] Normally process blocks from the forkdb on startup #572

Merged
merged 5 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1766,10 +1766,18 @@ struct controller_impl {
using BSP = std::decay_t<decltype(forkdb.root())>;

auto pending_head = forkdb.head();
auto root = forkdb.root();
if( pending_head ) {
ilog("fork database size ${s} head ${hn} : ${h}, root ${rn} : ${r}",
("s", forkdb.size())("hn", pending_head->block_num())("h", pending_head->id())
("rn", root->block_num())("r", root->id()));
} else if (root) {
ilog("fork database has no pending blocks root ${rn} : ${r}",
("rn", root->block_num())("r", root->id()));
} else {
ilog("fork database empty, no pending or root");
}
if( pending_head && blog_head && start_block_num <= blog_head->block_num() ) {
ilog("fork database head ${hn}:${h}, root ${rn}:${r}",
("hn", pending_head->block_num())("h", pending_head->id())
("rn", forkdb.root()->block_num())("r", forkdb.root()->id()));
if( pending_head->block_num() < chain_head.block_num() || chain_head.block_num() < forkdb.root()->block_num() ) {
ilog( "resetting fork database with new last irreversible block as the new root: ${id}", ("id", chain_head.id()) );
fork_db_reset_root_to_chain_head();
Expand Down Expand Up @@ -1969,15 +1977,35 @@ struct controller_impl {
auto finish_init = [&](auto& forkdb) {
if( read_mode != db_read_mode::IRREVERSIBLE ) {
auto pending_head = forkdb.head();
if ( pending_head && pending_head->id() != chain_head.id() && chain_head.id() == forkdb.root()->id() ) {
ilog( "read_mode has changed from irreversible: applying best branch from fork database" );
if ( pending_head && pending_head->id() != chain_head.id() ) {
// chain_head equal to root means that read_mode was changed from irreversible mode to head/speculative
bool chain_head_is_root = chain_head.id() == forkdb.root()->id();
if (chain_head_is_root) {
ilog( "read_mode has changed from irreversible: applying best branch from fork database" );
}

for( ; pending_head->id() != chain_head.id(); pending_head = forkdb.head() ) {
ilog( "applying branch from fork database ending with block: ${id}", ("id", pending_head->id()) );
controller::block_report br;
maybe_switch_forks( br, pending_head, controller::block_status::complete, {}, trx_meta_cache_lookup{} );
// See comment below about pause-at-block for why `|| conf.num_configured_p2p_peers > 0`
if (chain_head_is_root || conf.num_configured_p2p_peers > 0) {
for( ; pending_head->id() != chain_head.id(); pending_head = forkdb.head() ) {
ilog( "applying branch from fork database ending with block: ${id}", ("id", pending_head->id()) );
controller::block_report br;
maybe_switch_forks( br, pending_head, controller::block_status::complete, {}, trx_meta_cache_lookup{} );
}
}
}
} else {
// It is possible that the node was shutdown with blocks to process in the fork database. For example, if
// it was syncing and had processed blocks into the fork database but not yet applied them. In general,
// it makes sense to process those blocks on startup. However, if the node was shutdown via
// terminate-at-block, the current expectation is that the node can be restarted to examine the state at
// which it was shutdown. For now, we will only process these blocks if there are peers configured. This
// is a bit of a hack for Spring 1.0.0 until we can add a proper pause-at-block (issue #570) which could
// be used to explicitly request a node to not process beyond a specified block.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This compromise came after a long conversation with @arhag on potential alternatives.

if (conf.num_configured_p2p_peers > 0) {
ilog("Process blocks out of forkdb if needed");
log_irreversible();
transition_to_savanna_if_needed();
}
}
};

Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ namespace eosio::chain {
uint32_t maximum_variable_signature_length = chain::config::default_max_variable_signature_length;
bool disable_all_subjective_mitigations = false; //< for developer & testing purposes, can be configured using `disable-all-subjective-mitigations` when `EOSIO_DEVELOPER` build option is provided
uint32_t terminate_at_block = 0;
uint32_t num_configured_p2p_peers = 0;
bool integrity_hash_on_start= false;
bool integrity_hash_on_stop = false;

Expand Down
2 changes: 2 additions & 0 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ void chain_plugin_impl::plugin_initialize(const variables_map& options) {
if( options.count( "terminate-at-block" ))
chain_config->terminate_at_block = options.at( "terminate-at-block" ).as<uint32_t>();

chain_config->num_configured_p2p_peers = options.count( "p2p-peer-address" );

// move fork_db to new location
upgrade_from_reversible_to_fork_db( this );

Expand Down
2 changes: 2 additions & 0 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2224,6 +2224,8 @@ namespace eosio {
set_state( lib_catchup );
sync_last_requested_num = 0;
sync_next_expected_num = chain_info.lib_num + 1;
} else if (sync_next_expected_num >= sync_last_requested_num) {
// break
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change causes the net_plugin to request more blocks if possible instead of remaining in a mode where it thinks it is already syncing.

} else {
peer_dlog(c, "already syncing, start sync ignored");
return;
Expand Down
17 changes: 11 additions & 6 deletions tests/TestHarness/Node.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,21 +345,26 @@ def verifyAlive(self, silent=False):
if logStatus: Utils.Print(f'Determined node id {self.nodeId} (pid={pid}) is alive')
return True

def rmFromCmd(self, matchValue: str):
'''Removes all instances of matchValue from cmd array and succeeding value if it's an option value string.'''
def rmFromCmd(self, matchValue: str) -> str:
'''Removes all instances of matchValue from cmd array and succeeding value if it's an option value string.
Returns the removed strings as a space-delimited string.'''
if not self.cmd:
return
return ''

removed_items = []

while True:
try:
i = self.cmd.index(matchValue)
self.cmd.pop(i)
removed_items.append(self.cmd.pop(i)) # Store the matchValue
if len(self.cmd) > i:
if self.cmd[i][0] != '-':
self.cmd.pop(i)
if self.cmd[i][0] != '-': # Check if the next value isn't an option (doesn't start with '-')
removed_items.append(self.cmd.pop(i)) # Store the succeeding value
except ValueError:
break

return ' '.join(removed_items) # Return the removed strings as a space-delimited string

# pylint: disable=too-many-locals
# If nodeosPath is equal to None, it will use the existing nodeos path
def relaunch(self, chainArg=None, newChain=False, skipGenesis=True, timeout=Utils.systemWaitTimeout,
Expand Down
35 changes: 24 additions & 11 deletions tests/nodeos_read_terminate_at_block_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
def executeTest(cluster, testNodeId, testNodeArgs, resultMsgs):
testNode = None
testResult = False
resultDesc = "!!!BUG IS CONFIRMED ON TEST CASE #{} ({})".format(
resultDesc = "!!!BUG IS CONFIRMED ON TEST CASE #{} ({})".format(
testNodeId,
testNodeArgs
)
Expand All @@ -58,6 +58,7 @@ def executeTest(cluster, testNodeId, testNodeArgs, resultMsgs):

testNode = cluster.getNode(testNodeId)
assert not testNode.verifyAlive() # resets pid so reluanch works
peers = testNode.rmFromCmd('--p2p-peer-address')
testNode.relaunch(addSwapFlags={"--terminate-at-block": "9999999"})

# Wait for node to start up.
Expand All @@ -75,9 +76,9 @@ def executeTest(cluster, testNodeId, testNodeArgs, resultMsgs):
checkReplay(testNode, testNodeArgs)

# verify node can be restarted after a replay
checkRestart(testNode, "--replay-blockchain")
checkRestart(testNode, "--replay-blockchain", peers)

resultDesc = "!!!TEST CASE #{} ({}) IS SUCCESSFUL".format(
resultDesc = "!!!TEST CASE #{} ({}) IS SUCCESSFUL".format(
testNodeId,
testNodeArgs
)
Expand Down Expand Up @@ -144,12 +145,12 @@ def checkReplay(testNode, testNodeArgs):
head, lib = getBlockNumInfo(testNode)
assert head == termAtBlock, f"head {head} termAtBlock {termAtBlock}"

def checkRestart(testNode, rmChainArgs):
def checkRestart(testNode, rmChainArgs, peers):
"""Test restart of node continues"""
if testNode and not testNode.killed:
assert testNode.kill(signal.SIGTERM)

if not testNode.relaunch(rmArgs=rmChainArgs):
if not testNode.relaunch(chainArg=peers, rmArgs=rmChainArgs):
Utils.errorExit(f"Unable to relaunch after {rmChainArgs}")

assert testNode.verifyAlive(), f"relaunch failed after {rmChainArgs}"
Expand Down Expand Up @@ -201,7 +202,7 @@ def checkHeadOrSpeculative(head, lib):
def executeSnapshotBlocklogTest(cluster, testNodeId, resultMsgs, nodeArgs, termAtBlock):
testNode = cluster.getNode(testNodeId)
testResult = False
resultDesc = "!!!BUG IS CONFIRMED ON TEST CASE #{} ({})".format(
resultDesc = "!!!BUG IS CONFIRMED ON TEST CASE #{} ({})".format(
testNodeId,
f"replay block log, {nodeArgs} --terminate-at-block {termAtBlock}"
)
Expand All @@ -221,7 +222,7 @@ def executeSnapshotBlocklogTest(cluster, testNodeId, resultMsgs, nodeArgs, termA
m=re.search(r"Block ([\d]+) reached configured maximum block", line)
if m:
assert int(m.group(1)) == termAtBlock, f"actual terminating block number {m.group(1)} not equal to expected termAtBlock {termAtBlock}"
resultDesc = f"!!!TEST CASE #{testNodeId} (replay block log, mode {nodeArgs} --terminate-at-block {termAtBlock}) IS SUCCESSFUL"
resultDesc = f"!!!TEST CASE #{testNodeId} (replay block log, mode {nodeArgs} --terminate-at-block {termAtBlock}) IS SUCCESSFUL"
testResult = True

Print(resultDesc)
Expand Down Expand Up @@ -266,10 +267,10 @@ def executeSnapshotBlocklogTest(cluster, testNodeId, resultMsgs, nodeArgs, termA
0 : "--enable-stale-production"
}
regularNodeosArgs = {
1 : "--read-mode irreversible --terminate-at-block 75",
2 : "--read-mode head --terminate-at-block 100",
3 : "--read-mode speculative --terminate-at-block 125",
4 : "--read-mode irreversible --terminate-at-block 155"
1 : "--read-mode irreversible --terminate-at-block 100",
2 : "--read-mode head --terminate-at-block 125",
3 : "--read-mode speculative --terminate-at-block 150",
4 : "--read-mode irreversible --terminate-at-block 180"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These block values don't really matter, I could revert to old values if desired. They were changed when I was doing some initial testing to try and reproduce the issue in this test. The changes below are what is needed.

}
replayNodeosArgs = {
5 : "--read-mode irreversible",
Expand Down Expand Up @@ -345,6 +346,18 @@ def executeSnapshotBlocklogTest(cluster, testNodeId, resultMsgs, nodeArgs, termA
if not success:
break

# Test nodes can restart and advance lib
if not cluster.biosNode.relaunch():
Utils.errorExit("Unable to restart bios node")

if not producingNode.relaunch():
Utils.errorExit("Unable to restart producing node")

if success:
for nodeId, nodeArgs in {**regularNodeosArgs, **replayNodeosArgs}.items():
assert cluster.getNode(nodeId).relaunch(), f"Unable to relaunch {nodeId}"
assert cluster.getNode(nodeId).waitForLibToAdvance(), f"LIB did not advance for {nodeId}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra section on restarting nodes failed before the fixes in this PR.


testSuccessful = success

Utils.Print("Script End ................................")
Expand Down