Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Reduce memory usage in import #1239

Merged
merged 2 commits into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;

import java.util.List;
import java.util.concurrent.CompletableFuture;

public interface BlockHandler<B> {
CompletableFuture<List<B>> downloadBlocks(final List<BlockHeader> headers);
CompletableFuture<List<B>> downloadBlocks(List<BlockHeader> headers);

CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks);
CompletableFuture<List<B>> validateAndImportBlocks(List<B> blocks);

long extractBlockNumber(final B block);
long extractBlockNumber(B block);

Hash extractBlockHash(B block);

CompletableFuture<Void> executeParallelCalculations(List<B> blocks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
Expand Down Expand Up @@ -205,13 +205,13 @@ private void clearSyncTarget(final SyncTarget syncTarget) {
syncState.clearSyncTarget();
}

private CompletableFuture<List<Block>> importBlocks(final List<BlockHeader> checkpointHeaders) {
private CompletableFuture<List<Hash>> importBlocks(final List<BlockHeader> checkpointHeaders) {
if (checkpointHeaders.isEmpty()) {
// No checkpoints to download
return CompletableFuture.completedFuture(emptyList());
}

final CompletableFuture<List<Block>> importedBlocks =
final CompletableFuture<List<Hash>> importedBlocks =
blockImportTaskFactory.importBlocksForCheckpoints(checkpointHeaders);

return importedBlocks.whenComplete(
Expand Down Expand Up @@ -261,7 +261,7 @@ private CompletableFuture<List<Block>> importBlocks(final List<BlockHeader> chec
}

public interface BlockImportTaskFactory {
CompletableFuture<List<Block>> importBlocksForCheckpoints(
CompletableFuture<List<Hash>> importBlocksForCheckpoints(
final List<BlockHeader> checkpointHeaders);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
Expand Down Expand Up @@ -129,6 +130,11 @@ public long extractBlockNumber(final BlockWithReceipts blockWithReceipt) {
return blockWithReceipt.getHeader().getNumber();
}

@Override
public Hash extractBlockHash(final BlockWithReceipts block) {
return block.getHash();
}

@Override
public CompletableFuture<Void> executeParallelCalculations(final List<BlockWithReceipts> blocks) {
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.EthTaskChainDownloader.BlockImportTaskFactory;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
Expand All @@ -30,7 +30,6 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

class FastSyncBlockImportTaskFactory<C> implements BlockImportTaskFactory {

Expand Down Expand Up @@ -61,7 +60,7 @@ class FastSyncBlockImportTaskFactory<C> implements BlockImportTaskFactory {
}

@Override
public CompletableFuture<List<Block>> importBlocksForCheckpoints(
public CompletableFuture<List<Hash>> importBlocksForCheckpoints(
final List<BlockHeader> checkpointHeaders) {
if (checkpointHeaders.size() < 2) {
return CompletableFuture.completedFuture(emptyList());
Expand Down Expand Up @@ -94,10 +93,6 @@ public CompletableFuture<List<Block>> importBlocksForCheckpoints(
detatchedValidationPolicy,
checkpointHeaders,
metricsSystem);
return importTask
.run()
.thenApply(
results ->
results.stream().map(BlockWithReceipts::getBlock).collect(Collectors.toList()));
return importTask.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
Expand Down Expand Up @@ -77,6 +78,11 @@ public long extractBlockNumber(final Block block) {
return block.getHeader().getNumber();
}

@Override
public Hash extractBlockHash(final Block block) {
return block.getHash();
}

@Override
public CompletableFuture<Void> executeParallelCalculations(final List<Block> blocks) {
final EthScheduler ethScheduler = ethContext.getScheduler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.sync.EthTaskChainDownloader.BlockImportTaskFactory;
Expand Down Expand Up @@ -50,9 +51,9 @@ class FullSyncBlockImportTaskFactory<C> implements BlockImportTaskFactory {
}

@Override
public CompletableFuture<List<Block>> importBlocksForCheckpoints(
public CompletableFuture<List<Hash>> importBlocksForCheckpoints(
final List<BlockHeader> checkpointHeaders) {
final CompletableFuture<List<Block>> importedBlocks;
final CompletableFuture<List<Hash>> importedHashes;
if (checkpointHeaders.size() < 2) {
// Download blocks without constraining the end block
final ImportBlocksTask<C> importTask =
Expand All @@ -63,7 +64,7 @@ public CompletableFuture<List<Block>> importBlocksForCheckpoints(
checkpointHeaders.get(0),
config.downloaderChainSegmentSize(),
metricsSystem);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
importedHashes = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final ParallelImportChainSegmentTask<C, Block> importTask =
ParallelImportChainSegmentTask.forCheckpoints(
Expand All @@ -76,8 +77,8 @@ public CompletableFuture<List<Block>> importBlocksForCheckpoints(
() -> HeaderValidationMode.DETACHED_ONLY,
checkpointHeaders,
metricsSystem);
importedBlocks = importTask.run();
importedHashes = importTask.run();
}
return importedBlocks;
return importedHashes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask;
Expand All @@ -29,6 +30,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -38,7 +40,7 @@
*
* @param <C> the consensus algorithm context
*/
public class ImportBlocksTask<C> extends AbstractPeerTask<List<Block>> {
public class ImportBlocksTask<C> extends AbstractPeerTask<List<Hash>> {
private static final Logger LOG = LogManager.getLogger();

private final ProtocolContext<C> protocolContext;
Expand Down Expand Up @@ -92,7 +94,11 @@ protected void executeTaskWithPeer(final EthPeer peer) throws PeerNotConnected {
result.get().completeExceptionally(t);
} else {
LOG.debug("Import from block {} succeeded.", startNumber);
result.get().complete(new PeerTaskResult<>(peer, r));
result
.get()
.complete(
new PeerTaskResult<>(
peer, r.stream().map(Block::getHash).collect(Collectors.toList())));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractEthTask;
Expand All @@ -34,7 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<B>> {
public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<Hash>> {
private static final Logger LOG = LogManager.getLogger();

private final EthContext ethContext;
Expand Down Expand Up @@ -149,7 +150,7 @@ protected void executeTask() {
final CompletableFuture<?> extractTxSignaturesFuture =
scheduler.scheduleServiceTask(extractTxSignaturesTask);
registerSubTask(extractTxSignaturesFuture);
final CompletableFuture<List<List<B>>> validateBodiesFuture =
final CompletableFuture<List<List<Hash>>> validateBodiesFuture =
scheduler.scheduleServiceTask(validateAndImportBodiesTask);
registerSubTask(validateBodiesFuture);

Expand Down Expand Up @@ -182,7 +183,7 @@ protected void executeTask() {
cancelOnException.accept(null, e);
} else if (r != null) {
try {
final List<B> importedBlocks =
final List<Hash> importedBlocks =
validateBodiesFuture.get().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.metrics.MetricsSystem;
Expand All @@ -21,12 +22,13 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ParallelValidateAndImportBodiesTask<B>
extends AbstractPipelinedTask<List<B>, List<B>> {
extends AbstractPipelinedTask<List<B>, List<Hash>> {
private static final Logger LOG = LogManager.getLogger();

private final BlockHandler<B> blockHandler;
Expand All @@ -42,17 +44,20 @@ public class ParallelValidateAndImportBodiesTask<B>
}

@Override
protected Optional<List<B>> processStep(
protected Optional<List<Hash>> processStep(
final List<B> blocks, final Optional<List<B>> previousBlocks) {
final long firstBlock = blockHandler.extractBlockNumber(blocks.get(0));
final long lastBlock = blockHandler.extractBlockNumber(blocks.get(blocks.size() - 1));
LOG.debug("Starting import of chain segment {} to {}", firstBlock, lastBlock);
final CompletableFuture<List<B>> importedBlocksFuture =
blockHandler.validateAndImportBlocks(blocks);
try {
final List<B> downloadedBlocks = importedBlocksFuture.get();
final List<Hash> downloadedHashes =
importedBlocksFuture.get().stream()
.map(blockHandler::extractBlockHash)
.collect(Collectors.toList());
LOG.info("Completed importing chain segment {} to {}", firstBlock, lastBlock);
return Optional.of(downloadedBlocks);
return Optional.of(downloadedHashes);
} catch (final InterruptedException | ExecutionException e) {
failExceptionally(e);
return Optional.empty();
Expand Down
Loading