diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java index 2f603d5bd6..1815484e4f 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java @@ -180,11 +180,6 @@ private static CachingTaskCollection createWorldStateDownloader "Pending request cache size for fast sync world state download", taskCollection::cacheSize); - // We're using the CachingTaskCollection which isn't designed to reliably persist all - // added tasks. We therefore can't resume from previously added tasks. - // So for now, clear tasks when we start up. - taskCollection.clear(); - return taskCollection; } } diff --git a/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java index 9147a717c9..ab3353c728 100644 --- a/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java +++ b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java @@ -19,7 +19,6 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.nio.file.Path; -import java.util.Comparator; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,7 +37,6 @@ public class RocksDbTaskQueue implements TaskCollection { private long lastEnqueuedKey = 0; private long lastDequeuedKey = 0; - private long oldestKey = 0; private RocksIterator dequeueIterator; private long lastValidKeyFromIterator; private final Set> outstandingTasks = new HashSet<>(); @@ -60,7 +58,9 @@ private RocksDbTaskQueue( this.deserializer = deserializer; try { RocksDbUtil.loadNativeLibrary(); - options = new Options().setCreateIfMissing(true); + // We don't support reloading data so ensure we're starting from a clean slate. + RocksDB.destroyDB(storageDirectory.toString(), new Options()); + options = new Options().setCreateIfMissing(true).setErrorIfExists(true); db = RocksDB.open(options, storageDirectory.toString()); enqueueLatency = @@ -74,29 +74,11 @@ private RocksDbTaskQueue( "dequeue_latency_seconds", "Latency for dequeuing an item."); - // Initialize queue from existing db - initializeQueue(); } catch (final RocksDBException e) { throw new StorageException(e); } } - private void initializeQueue() { - RocksIterator iter = db.newIterator(); - iter.seekToFirst(); - if (!iter.isValid()) { - // There is no data yet, nothing to do - return; - } - long firstKey = Longs.fromByteArray(iter.key()); - iter.seekToLast(); - long lastKey = Longs.fromByteArray(iter.key()); - - lastDequeuedKey = firstKey - 1; - oldestKey = firstKey; - lastEnqueuedKey = lastKey; - } - public static RocksDbTaskQueue create( final Path storageDirectory, final Function serializer, @@ -167,7 +149,7 @@ public synchronized boolean isEmpty() { public synchronized void clear() { assertNotClosed(); outstandingTasks.clear(); - final byte[] from = Longs.toByteArray(oldestKey); + final byte[] from = Longs.toByteArray(0); final byte[] to = Longs.toByteArray(lastEnqueuedKey + 1); try { db.deleteRange(from, to); @@ -177,7 +159,6 @@ public synchronized void clear() { } lastDequeuedKey = 0; lastEnqueuedKey = 0; - oldestKey = 0; } catch (final RocksDBException e) { throw new StorageException(e); } @@ -188,26 +169,6 @@ public synchronized boolean allTasksCompleted() { return isEmpty() && outstandingTasks.isEmpty(); } - private synchronized void deleteCompletedTasks() { - final long oldestOutstandingKey = - outstandingTasks.stream() - .min(Comparator.comparingLong(RocksDbTask::getKey)) - .map(RocksDbTask::getKey) - .orElse(lastDequeuedKey + 1); - - if (oldestKey < oldestOutstandingKey) { - // Delete all contiguous completed tasks - final byte[] fromKey = Longs.toByteArray(oldestKey); - final byte[] toKey = Longs.toByteArray(oldestOutstandingKey); - try { - db.deleteRange(fromKey, toKey); - oldestKey = oldestOutstandingKey; - } catch (final RocksDBException e) { - throw new StorageException(e); - } - } - } - @Override public synchronized void close() { if (closed) { @@ -228,11 +189,7 @@ private void assertNotClosed() { } private synchronized boolean markTaskCompleted(final RocksDbTask task) { - if (outstandingTasks.remove(task)) { - deleteCompletedTasks(); - return true; - } - return false; + return outstandingTasks.remove(task); } private synchronized void handleFailedTask(final RocksDbTask task) { diff --git a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java index 732787894a..835c275d68 100644 --- a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java +++ b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java @@ -12,8 +12,6 @@ */ package tech.pegasys.pantheon.services.tasks; -import static org.assertj.core.api.Assertions.assertThat; - import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -22,7 +20,6 @@ import java.util.function.Function; import org.junit.Rule; -import org.junit.Test; import org.junit.rules.TemporaryFolder; public class RocksDbTaskQueueTest extends AbstractTaskQueueTest> { @@ -39,43 +36,4 @@ private RocksDbTaskQueue createQueue(final Path dataDir) { return RocksDbTaskQueue.create( dataDir, Function.identity(), Function.identity(), new NoOpMetricsSystem()); } - - @Test - public void shouldResumeFromExistingQueue() throws Exception { - testResumeFromExistingQueue(10); - } - - @Test - public void shouldResumeFromExistingQueueWithOneElement() throws Exception { - testResumeFromExistingQueue(1); - } - - @Test - public void shouldResumeFromExistingQueueWithNoElements() throws Exception { - testResumeFromExistingQueue(0); - } - - private void testResumeFromExistingQueue(final int elementCount) throws Exception { - final Path dataDir = folder.newFolder().toPath(); - try (final RocksDbTaskQueue queue = createQueue(dataDir)) { - for (int i = 0; i < elementCount; i++) { - queue.add(BytesValue.of(i)); - } - } - - try (final RocksDbTaskQueue resumedQueue = createQueue(dataDir)) { - assertThat(resumedQueue.size()).isEqualTo(elementCount); - // Queue an additional element - resumedQueue.add(BytesValue.of(99)); - assertThat(resumedQueue.size()).isEqualTo(elementCount + 1); - - // Check that everything dequeues in order as expected - for (int i = 0; i < elementCount; i++) { - assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(i)); - } - assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(99)); - - assertThat(resumedQueue.size()).isEqualTo(0); - } - } }