Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import blocks in batches #1118

Merged
merged 6 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 108 additions & 17 deletions modAionImpl/src/org/aion/zero/impl/blockchain/AionBlockchainImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -109,6 +110,7 @@
import org.aion.base.AionTxReceipt;
import org.apache.commons.collections4.map.LRUMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -130,6 +132,8 @@
public class AionBlockchainImpl implements IAionBlockchain {

private static final Logger LOG = LoggerFactory.getLogger(LogEnum.CONS.name());
private static final Logger SURVEY_LOG = LoggerFactory.getLogger(LogEnum.SURVEY.name());
private static final Logger SYNC_LOG = LoggerFactory.getLogger(LogEnum.SYNC.name());
private static final Logger TX_LOG = LoggerFactory.getLogger(LogEnum.TX.name());
private static final int THOUSAND_MS = 1000;
private static final int DIFFICULTY_BYTES = 16;
Expand Down Expand Up @@ -973,17 +977,108 @@ public synchronized ImportResult tryToConnect(final Block block) {
LOG.info("Shutting down as indicated by CLI request sync to the top {} was reached.", bestBlock.getNumber());
System.exit(SystemExitCodes.NORMAL);
}
return tryToConnectInternal(block, System.currentTimeMillis() / THOUSAND_MS);
return tryToConnectWithTimedExecution(block).getLeft();
}

public synchronized void compactState() {
repository.compactState();
/**
* Imports a batch of blocks.
*
* @param blockRange the block range to be imported
* @param peerDisplayId the display identifier for the peer who provided the batch
* @return a {@link Triple} containing:
* <ol>
* <li>the best block height after the imports,</li>
* <li>the set of imported hashes,</li>
* <li>the import result for the last imported block</li>
* </ol>
*/
public synchronized Triple<Long, Set<ByteArrayWrapper>, ImportResult> tryToConnect(final List<Block> blockRange, String peerDisplayId) {
ImportResult importResult = null;
Set<ByteArrayWrapper> imported = new HashSet<>();
for (Block block : blockRange) {
if (bestBlock.getNumber() == shutdownHook) {
LOG.info("Shutting down and dumping heap as indicated by CLI request since block number {} was reached.", shutdownHook);

try {
HeapDumper.dumpHeap(new File(System.currentTimeMillis() + "-heap-report.hprof").getAbsolutePath(), true);
} catch (Exception e) {
LOG.error("Unable to dump heap due to exception:", e);
}

// requested shutdown
System.exit(SystemExitCodes.NORMAL);
} else if (enableFullSyncCheck && reachedFullSync) {
LOG.info("Shutting down as indicated by CLI request sync to the top {} was reached.", bestBlock.getNumber());
System.exit(SystemExitCodes.NORMAL);
}

Pair<ImportResult, Long> result = tryToConnectWithTimedExecution(block);
importResult = result.getLeft();
long importTime = result.getRight();

// printing additional information when debug is enabled
SYNC_LOG.debug(
"<import-status: node = {}, hash = {}, number = {}, txs = {}, block time = {}, result = {}, time elapsed = {} ms, block td = {}, chain td = {}>",
peerDisplayId,
block.getShortHash(),
block.getNumber(),
block.getTransactionsList().size(),
block.getTimestamp(),
importResult,
importTime,
block.getTotalDifficulty(),
getTotalDifficulty());

// stop at invalid blocks
if (!importResult.isStored()) {
return Triple.of(bestBlock.getNumber(), imported, importResult);
} else {
imported.add(block.getHashWrapper());
}
}
return Triple.of(bestBlock.getNumber(), imported, importResult);
}

private long surveyTotalImportTime = 0;
private long surveyLongImportTimeCount = 0;
private long surveySuperLongImportTimeCount = 0;
private long surveyLastLogImportTime = System.currentTimeMillis();
private long surveyTotalImportedBlocks = 0;
private long surveyLongestImportTime = 0;
private final long DIVISOR_MS = 1_000_000L;
private final long ONE_SECOND = 1_000L * DIVISOR_MS;

public Pair<ImportResult, Long> tryToConnectWithTimedExecution(Block block) {
long importTime = System.nanoTime();
ImportResult importResult = tryToConnectAndFetchSummary(block, true).getLeft();
importTime = (System.nanoTime() - importTime);

if (SURVEY_LOG.isInfoEnabled()) {
if (importResult.isValid()) {
surveyLongestImportTime = Math.max(surveyLongestImportTime, importTime);
surveyTotalImportTime += importTime;
surveyTotalImportedBlocks++;
if (importTime >= (10L * ONE_SECOND)) {
surveySuperLongImportTimeCount++;
} else if (importTime >= (ONE_SECOND)) {
surveyLongImportTimeCount++;
}
}

if (System.currentTimeMillis() >= surveyLastLogImportTime + (60L * 1_000L)) {
SURVEY_LOG.info("Total import#[{}], importTime[{}]ms, 1s+Import#[{}], 10s+Import#[{}] longestImport[{}]ms",
surveyTotalImportedBlocks,
(surveyTotalImportTime / DIVISOR_MS),
surveyLongImportTimeCount,
surveySuperLongImportTimeCount,
surveyLongestImportTime / DIVISOR_MS);
surveyLastLogImportTime = System.currentTimeMillis();
}
}
return Pair.of(importResult, importTime);
}

/* TODO AKI-440: We should either refactor this to remove the redundant parameter,
or provide it as an input to isValid() */
public Pair<ImportResult, AionBlockSummary> tryToConnectAndFetchSummary(
Block block, long currTimeSeconds, boolean doExistCheck) {
public Pair<ImportResult, AionBlockSummary> tryToConnectAndFetchSummary(Block block, boolean doExistCheck) {
// Check block exists before processing more rules
if (doExistCheck // skipped when redoing imports
&& getBlockStore().getMaxNumber() >= block.getNumber()
Expand Down Expand Up @@ -1137,15 +1232,6 @@ public Pair<AionBlockSummary, RepositoryCache> tryImportWithoutFlush(final Block
return add(block, false, false);
}

/**
* Processes a new block and potentially appends it to the blockchain, thereby changing the
* state of the world. Decoupled from wrapper function {@link #tryToConnect(Block)} so we
* can feed timestamps manually
*/
ImportResult tryToConnectInternal(final Block block, long currTimeSeconds) {
return tryToConnectAndFetchSummary(block, currTimeSeconds, true).getLeft();
}

/**
* Creates a new mining block, if you require more context refer to the blockContext creation
* method, which allows us to add metadata not usually associated with the block itself.
Expand Down Expand Up @@ -2033,7 +2119,12 @@ public synchronized void setBestBlock(Block block) {

@Override
public synchronized void close() {
getBlockStore().close();
SURVEY_LOG.info("Total import#[{}], importTime[{}]ms, 1s+Import#[{}], 10s+Import#[{}] longestImport[{}]ms",
surveyTotalImportedBlocks,
(surveyTotalImportTime / DIVISOR_MS),
surveyLongImportTimeCount,
surveySuperLongImportTimeCount,
surveyLongestImportTime / DIVISOR_MS);
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ public void close() {
genLOG.info("shutdown DB... Done!");
}

blockchain.close();

this.start.set(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public void assertEqualTotalDifficulty() {

@Override
public synchronized ImportResult tryToConnect(final Block block) {
ImportResult result = tryToConnectInternal(block, System.currentTimeMillis() / 1000);
ImportResult result = tryToConnectAndFetchSummary(block, true).getLeft();

if (result == ImportResult.IMPORTED_BEST) {
BigInteger tdForHash = getBlockStore().getTotalDifficultyForHash(block.getHash());
Expand All @@ -507,7 +507,7 @@ public synchronized ImportResult tryToConnect(final Block block) {

// TEMPORARY: here to support the ConsensusTest
public synchronized Pair<ImportResult, AionBlockSummary> tryToConnectAndFetchSummary(Block block) {
return tryToConnectAndFetchSummary(block, System.currentTimeMillis() / 1000, true);
return tryToConnectAndFetchSummary(block, true);
}

/** Uses the createNewMiningBlockInternal functionality to avoid time-stamping issues. */
Expand Down
9 changes: 0 additions & 9 deletions modAionImpl/src/org/aion/zero/impl/db/AionRepositoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -881,15 +881,6 @@ public void compact() {
}
}

public void compactState() {
rwLock.writeLock().lock();
try {
this.stateDatabase.compact();
} finally {
rwLock.writeLock().unlock();
}
}

/**
* Retrieves the value for a given node from the database associated with the given type.
*
Expand Down
59 changes: 24 additions & 35 deletions modAionImpl/src/org/aion/zero/impl/db/DBUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.aion.zero.impl.types.AionBlockSummary;
import org.aion.zero.impl.types.AionTxInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

/**
* Methods used by CLI calls for debugging the local blockchain data.
Expand Down Expand Up @@ -504,9 +505,10 @@ public static void redoMainChainImport(long startHeight) {
cfg.dbFromXML();
cfg.getConsensus().setMining(false);

System.out.println("\nImporting stored blocks INITIATED...\n");

AionLoggerFactory.initAll(Map.of(LogEnum.GEN, LogLevel.INFO));
final Logger LOG = AionLoggerFactory.getLogger(LogEnum.GEN.name());

LOG.info("Importing stored blocks INITIATED...");

AionBlockchainImpl chain = new AionBlockchainImpl(cfg, false);
AionRepositoryImpl repo = chain.getRepository();
Expand All @@ -517,14 +519,13 @@ public static void redoMainChainImport(long startHeight) {
Block startBlock;
long currentBlock;
if (block != null && startHeight <= block.getNumber()) {
System.out.println(
"\nImporting the main chain from block #"
LOG.info("Importing the main chain from block #"
+ startHeight
+ " to block #"
+ block.getNumber()
+ ". This may take a while.\n"
+ "The time estimates are optimistic based on current progress.\n"
+ "It is expected that later blocks take a longer time to import due to the increasing size of the database.\n");
+ "It is expected that later blocks take a longer time to import due to the increasing size of the database.");

if (startHeight == 0L) {
// dropping databases that can be inferred when starting from genesis
Expand All @@ -535,7 +536,7 @@ public static void redoMainChainImport(long startHeight) {
AionGenesis genesis = cfg.getGenesis();
store.redoIndexWithoutSideChains(genesis); // clear the index entry
AionHubUtils.buildGenesis(genesis, repo);
System.out.println("\nFinished rebuilding genesis block.");
LOG.info("Finished rebuilding genesis block.");
startBlock = genesis;
currentBlock = 1L;
chain.setTotalDifficulty(genesis.getDifficultyBI());
Expand All @@ -550,10 +551,7 @@ public static void redoMainChainImport(long startHeight) {
boolean fail = false;

if (startBlock == null) {
System.out.println(
"The main chain block at level "
+ currentBlock
+ " is missing from the database. Cannot continue importing stored blocks.");
LOG.info("The main chain block at level {} is missing from the database. Cannot continue importing stored blocks.", currentBlock);
fail = true;
} else {
chain.setBestBlock(startBlock);
Expand All @@ -570,10 +568,7 @@ public static void redoMainChainImport(long startHeight) {
while (currentBlock <= topBlockNumber) {
block = store.getChainBlockByNumber(currentBlock);
if (block == null) {
System.out.println(
"The main chain block at level "
+ currentBlock
+ " is missing from the database. Cannot continue importing stored blocks.");
LOG.error("The main chain block at level {} is missing from the database. Cannot continue importing stored blocks.", currentBlock);
fail = true;
break;
}
Expand All @@ -582,19 +577,17 @@ public static void redoMainChainImport(long startHeight) {
// clear the index entry and prune side-chain blocks
store.redoIndexWithoutSideChains(block);
long t1 = System.currentTimeMillis();
result =
chain.tryToConnectAndFetchSummary(
block, System.currentTimeMillis() / THOUSAND_MS, false);
result = chain.tryToConnectAndFetchSummary(block, false);
long t2 = System.currentTimeMillis();
System.out.println("<import-status: hash = " + block.getShortHash() + ", number = " + block.getNumber()
LOG.info("<import-status: hash = " + block.getShortHash() + ", number = " + block.getNumber()
+ ", txs = " + block.getTransactionsList().size() + ", result = " + result.getLeft()
+ ", time elapsed = " + (t2 - t1) + " ms, td = " + chain.getTotalDifficulty() + ">");
} catch (Throwable t) {
// we want to see the exception and the block where it occurred
t.printStackTrace();
if (t.getMessage() != null
&& t.getMessage().contains("Invalid Trie state, missing node ")) {
System.out.println(
LOG.info(
"The exception above is likely due to a pruned database and NOT a consensus problem.\n"
+ "Rebuild the full state by editing the config.xml file or running ./aion.sh --state FULL.\n");
}
Expand All @@ -620,16 +613,15 @@ public AionBlockSummary getRight() {
}

if (!result.getLeft().isSuccessful()) {
System.out.println("Consensus break at block:\n" + block);
System.out.println(
"Import attempt returned result "
LOG.error("Consensus break at block:\n" + block);
LOG.info("Import attempt returned result "
+ result.getLeft()
+ " with summary\n"
+ result.getRight());

if (repo.isValidRoot(store.getBestBlock().getStateRoot())) {
System.out.println("The repository state trie was:\n");
System.out.println(repo.getTrieDump());
LOG.info("The repository state trie was:\n");
LOG.info(repo.getTrieDump());
}

fail = true;
Expand All @@ -643,8 +635,7 @@ public AionBlockSummary getRight() {
long remainingBlocks = topBlockNumber - currentBlock;
double estimate =
(timePerBlock * remainingBlocks) / 60_000 + 1; // in minutes
System.out.println(
"Finished with blocks up to "
LOG.info("Finished with blocks up to "
+ currentBlock
+ " in "
+ String.format("%.0f", time)
Expand All @@ -661,32 +652,30 @@ public AionBlockSummary getRight() {

currentBlock++;
}
System.out.println("Import from " + startHeight + " to " + topBlockNumber + " completed in " + (System.currentTimeMillis() - start) + " ms time.");
LOG.info("Import from " + startHeight + " to " + topBlockNumber + " completed in " + (System.currentTimeMillis() - start) + " ms time.");
}

if (fail) {
System.out.println("Importing stored blocks FAILED.");
LOG.info("Importing stored blocks FAILED.");
} else {
System.out.println("Importing stored blocks SUCCESSFUL.");
LOG.info("Importing stored blocks SUCCESSFUL.");
}
} else {
if (block == null) {
System.out.println(
"The best known block in null. The given database is likely empty. Nothing to do.");
LOG.info("The best known block in null. The given database is likely empty. Nothing to do.");
} else {
System.out.println(
"The given height "
LOG.info("The given height "
+ startHeight
+ " is above the best known block "
+ block.getNumber()
+ ". Nothing to do.");
}
}

System.out.println("Closing databases...");
LOG.info("Closing databases...");
repo.close();

System.out.println("Importing stored blocks COMPLETE.");
LOG.info("Importing stored blocks COMPLETE.");
}

/** @implNote Used by the CLI call. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void storePendingBlocks(final List<Block> batch, final String displayId)
this.syncStats.updatePeerBlocks(displayId, stored, BlockType.STORED);

// log operation
log.info(
log.debug(
"<import-status: STORED {} out of {} blocks from node = {}, starting with hash = {}, number = {}, txs = {}>",
stored,
batch.size(),
Expand Down
Loading