Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Reduce memory usage in import (#1239)
Browse files Browse the repository at this point in the history
There is no need to keep entire blocks during import after they have
been imported.  Keep just the hashes instead.
  • Loading branch information
shemnon authored Apr 9, 2019
1 parent 99b610d commit 5e91334
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;

import java.util.List;
import java.util.concurrent.CompletableFuture;

public interface BlockHandler<B> {
CompletableFuture<List<B>> downloadBlocks(final List<BlockHeader> headers);
CompletableFuture<List<B>> downloadBlocks(List<BlockHeader> headers);

CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks);
CompletableFuture<List<B>> validateAndImportBlocks(List<B> blocks);

long extractBlockNumber(final B block);
long extractBlockNumber(B block);

Hash extractBlockHash(B block);

CompletableFuture<Void> executeParallelCalculations(List<B> blocks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
Expand Down Expand Up @@ -205,13 +205,13 @@ private void clearSyncTarget(final SyncTarget syncTarget) {
syncState.clearSyncTarget();
}

private CompletableFuture<List<Block>> importBlocks(final List<BlockHeader> checkpointHeaders) {
private CompletableFuture<List<Hash>> importBlocks(final List<BlockHeader> checkpointHeaders) {
if (checkpointHeaders.isEmpty()) {
// No checkpoints to download
return CompletableFuture.completedFuture(emptyList());
}

final CompletableFuture<List<Block>> importedBlocks =
final CompletableFuture<List<Hash>> importedBlocks =
blockImportTaskFactory.importBlocksForCheckpoints(checkpointHeaders);

return importedBlocks.whenComplete(
Expand Down Expand Up @@ -261,7 +261,7 @@ private CompletableFuture<List<Block>> importBlocks(final List<BlockHeader> chec
}

public interface BlockImportTaskFactory {
CompletableFuture<List<Block>> importBlocksForCheckpoints(
CompletableFuture<List<Hash>> importBlocksForCheckpoints(
final List<BlockHeader> checkpointHeaders);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
Expand Down Expand Up @@ -129,6 +130,11 @@ public long extractBlockNumber(final BlockWithReceipts blockWithReceipt) {
return blockWithReceipt.getHeader().getNumber();
}

@Override
public Hash extractBlockHash(final BlockWithReceipts block) {
return block.getHash();
}

@Override
public CompletableFuture<Void> executeParallelCalculations(final List<BlockWithReceipts> blocks) {
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.EthTaskChainDownloader.BlockImportTaskFactory;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
Expand All @@ -30,7 +30,6 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

class FastSyncBlockImportTaskFactory<C> implements BlockImportTaskFactory {

Expand Down Expand Up @@ -61,7 +60,7 @@ class FastSyncBlockImportTaskFactory<C> implements BlockImportTaskFactory {
}

@Override
public CompletableFuture<List<Block>> importBlocksForCheckpoints(
public CompletableFuture<List<Hash>> importBlocksForCheckpoints(
final List<BlockHeader> checkpointHeaders) {
if (checkpointHeaders.size() < 2) {
return CompletableFuture.completedFuture(emptyList());
Expand Down Expand Up @@ -94,10 +93,6 @@ public CompletableFuture<List<Block>> importBlocksForCheckpoints(
detatchedValidationPolicy,
checkpointHeaders,
metricsSystem);
return importTask
.run()
.thenApply(
results ->
results.stream().map(BlockWithReceipts::getBlock).collect(Collectors.toList()));
return importTask.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
Expand Down Expand Up @@ -77,6 +78,11 @@ public long extractBlockNumber(final Block block) {
return block.getHeader().getNumber();
}

@Override
public Hash extractBlockHash(final Block block) {
return block.getHash();
}

@Override
public CompletableFuture<Void> executeParallelCalculations(final List<Block> blocks) {
final EthScheduler ethScheduler = ethContext.getScheduler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.sync.EthTaskChainDownloader.BlockImportTaskFactory;
Expand Down Expand Up @@ -50,9 +51,9 @@ class FullSyncBlockImportTaskFactory<C> implements BlockImportTaskFactory {
}

@Override
public CompletableFuture<List<Block>> importBlocksForCheckpoints(
public CompletableFuture<List<Hash>> importBlocksForCheckpoints(
final List<BlockHeader> checkpointHeaders) {
final CompletableFuture<List<Block>> importedBlocks;
final CompletableFuture<List<Hash>> importedHashes;
if (checkpointHeaders.size() < 2) {
// Download blocks without constraining the end block
final ImportBlocksTask<C> importTask =
Expand All @@ -63,7 +64,7 @@ public CompletableFuture<List<Block>> importBlocksForCheckpoints(
checkpointHeaders.get(0),
config.downloaderChainSegmentSize(),
metricsSystem);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
importedHashes = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final ParallelImportChainSegmentTask<C, Block> importTask =
ParallelImportChainSegmentTask.forCheckpoints(
Expand All @@ -76,8 +77,8 @@ public CompletableFuture<List<Block>> importBlocksForCheckpoints(
() -> HeaderValidationMode.DETACHED_ONLY,
checkpointHeaders,
metricsSystem);
importedBlocks = importTask.run();
importedHashes = importTask.run();
}
return importedBlocks;
return importedHashes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask;
Expand All @@ -29,6 +30,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -38,7 +40,7 @@
*
* @param <C> the consensus algorithm context
*/
public class ImportBlocksTask<C> extends AbstractPeerTask<List<Block>> {
public class ImportBlocksTask<C> extends AbstractPeerTask<List<Hash>> {
private static final Logger LOG = LogManager.getLogger();

private final ProtocolContext<C> protocolContext;
Expand Down Expand Up @@ -92,7 +94,11 @@ protected void executeTaskWithPeer(final EthPeer peer) throws PeerNotConnected {
result.get().completeExceptionally(t);
} else {
LOG.debug("Import from block {} succeeded.", startNumber);
result.get().complete(new PeerTaskResult<>(peer, r));
result
.get()
.complete(
new PeerTaskResult<>(
peer, r.stream().map(Block::getHash).collect(Collectors.toList())));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractEthTask;
Expand All @@ -34,7 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<B>> {
public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<Hash>> {
private static final Logger LOG = LogManager.getLogger();

private final EthContext ethContext;
Expand Down Expand Up @@ -149,7 +150,7 @@ protected void executeTask() {
final CompletableFuture<?> extractTxSignaturesFuture =
scheduler.scheduleServiceTask(extractTxSignaturesTask);
registerSubTask(extractTxSignaturesFuture);
final CompletableFuture<List<List<B>>> validateBodiesFuture =
final CompletableFuture<List<List<Hash>>> validateBodiesFuture =
scheduler.scheduleServiceTask(validateAndImportBodiesTask);
registerSubTask(validateBodiesFuture);

Expand Down Expand Up @@ -182,7 +183,7 @@ protected void executeTask() {
cancelOnException.accept(null, e);
} else if (r != null) {
try {
final List<B> importedBlocks =
final List<Hash> importedBlocks =
validateBodiesFuture.get().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.metrics.MetricsSystem;
Expand All @@ -21,12 +22,13 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ParallelValidateAndImportBodiesTask<B>
extends AbstractPipelinedTask<List<B>, List<B>> {
extends AbstractPipelinedTask<List<B>, List<Hash>> {
private static final Logger LOG = LogManager.getLogger();

private final BlockHandler<B> blockHandler;
Expand All @@ -42,17 +44,20 @@ public class ParallelValidateAndImportBodiesTask<B>
}

@Override
protected Optional<List<B>> processStep(
protected Optional<List<Hash>> processStep(
final List<B> blocks, final Optional<List<B>> previousBlocks) {
final long firstBlock = blockHandler.extractBlockNumber(blocks.get(0));
final long lastBlock = blockHandler.extractBlockNumber(blocks.get(blocks.size() - 1));
LOG.debug("Starting import of chain segment {} to {}", firstBlock, lastBlock);
final CompletableFuture<List<B>> importedBlocksFuture =
blockHandler.validateAndImportBlocks(blocks);
try {
final List<B> downloadedBlocks = importedBlocksFuture.get();
final List<Hash> downloadedHashes =
importedBlocksFuture.get().stream()
.map(blockHandler::extractBlockHash)
.collect(Collectors.toList());
LOG.info("Completed importing chain segment {} to {}", firstBlock, lastBlock);
return Optional.of(downloadedBlocks);
return Optional.of(downloadedHashes);
} catch (final InterruptedException | ExecutionException e) {
failExceptionally(e);
return Optional.empty();
Expand Down
Loading

0 comments on commit 5e91334

Please sign in to comment.