Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes race condition between NodeAddedListener and FullBlockImportStep #56

Merged
merged 9 commits into from
Oct 3, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,9 @@ public MarkSweepPruner(
}

public void prepare() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case.
markStorage.clear();
pendingMarks.clear();
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes);
}

public void cleanup() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
}

public void mark(final Hash rootHash) {
markOperationCounter.inc();
createStateTrie(rootHash)
Expand Down Expand Up @@ -151,9 +144,18 @@ public void sweepBefore(final long markedBlockNumber) {
// Sweep non-state-root nodes
prunedNodeCount += worldStateStorage.prune(this::isMarked);
sweptNodesCounter.inc(prunedNodeCount);
clearMarks();
LOG.debug("Completed sweeping unused nodes");
}

public void cleanup() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
clearMarks();
}

public void clearMarks() {
markStorage.clear();
LOG.debug("Completed sweeping unused nodes");
pendingMarks.clear();
}

private boolean isMarked(final Bytes32 key) {
Expand Down Expand Up @@ -190,7 +192,14 @@ private void processAccountState(final BytesValue value) {

@VisibleForTesting
void markNode(final Bytes32 hash) {
markNodes(Collections.singleton(hash));
markedNodesCounter.inc();
markLock.lock();
try {
pendingMarks.add(hash);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
Comment on lines +200 to +207
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking is a hot loop so I undid the DRY I did before

}

private void markNodes(final Collection<Bytes32> nodeHashes) {
Expand All @@ -210,7 +219,7 @@ private void maybeFlushPendingMarks() {
}
}

void flushPendingMarks() {
private void flushPendingMarks() {
markLock.lock();
try {
final KeyValueStorageTransaction transaction = markStorage.startTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public Pruner(

public void start() {
LOG.info("Starting Pruner.");
pruningStrategy.prepare();
blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event));
}

Expand All @@ -73,7 +74,6 @@ private void handleNewBlock(final BlockAddedEvent event) {

final long blockNumber = event.getBlock().getHeader().getNumber();
if (state.compareAndSet(State.IDLE, State.MARK_BLOCK_CONFIRMATIONS_AWAITING)) {
pruningStrategy.prepare();
markBlockNumber = blockNumber;
} else if (blockNumber >= markBlockNumber + blockConfirmations
&& state.compareAndSet(State.MARK_BLOCK_CONFIRMATIONS_AWAITING, State.MARKING)) {
Expand All @@ -87,7 +87,6 @@ private void handleNewBlock(final BlockAddedEvent event) {
}

private void mark(final BlockHeader header) {
markBlockNumber = header.getNumber();
final Hash stateRoot = header.getStateRoot();
LOG.debug(
"Begin marking used nodes for pruning. Block number: {} State root: {}",
Expand Down Expand Up @@ -117,6 +116,7 @@ private void execute(final Runnable action) {
executorService.execute(action);
} catch (final Throwable t) {
LOG.error("Pruning failed", t);
pruningStrategy.cleanup();
state.set(State.IDLE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ private void handleFastSyncResult(final FastSyncState result, final Throwable er
}

private void startFullSync() {
fullSyncDownloader.start();
maybePruner.ifPresent(Pruner::start);
fullSyncDownloader.start();
}

@Override
Expand Down