diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java index 0aca7e6cdd..4ff17ca395 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java @@ -29,10 +29,10 @@ import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.OperationTimer; -import tech.pegasys.pantheon.services.queue.BigQueue; -import tech.pegasys.pantheon.services.queue.BytesQueue; -import tech.pegasys.pantheon.services.queue.BytesQueueAdapter; -import tech.pegasys.pantheon.services.queue.RocksDbQueue; +import tech.pegasys.pantheon.services.queue.BytesTaskQueue; +import tech.pegasys.pantheon.services.queue.BytesTaskQueueAdapter; +import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue; +import tech.pegasys.pantheon.services.queue.TaskQueue; import java.io.File; import java.io.IOException; @@ -50,14 +50,14 @@ class FastSynchronizer { private final FastSyncDownloader fastSyncDownloader; private final Path fastSyncDataDirectory; - private final BigQueue stateQueue; + private final TaskQueue stateQueue; private final WorldStateDownloader worldStateDownloader; private final FastSyncState initialSyncState; private FastSynchronizer( final FastSyncDownloader fastSyncDownloader, final Path fastSyncDataDirectory, - final BigQueue stateQueue, + final TaskQueue stateQueue, final WorldStateDownloader worldStateDownloader, final FastSyncState initialSyncState) { this.fastSyncDownloader = fastSyncDownloader; @@ -94,7 +94,7 @@ public static Optional> create( return Optional.empty(); } - final BigQueue stateQueue = + final TaskQueue stateQueue = createWorldStateDownloaderQueue(getStateQueueDirectory(dataDirectory), metricsSystem); final WorldStateDownloader worldStateDownloader = new WorldStateDownloader( @@ -166,10 +166,10 @@ private static void ensureDirectoryExists(final File dir) { } } - private static BigQueue createWorldStateDownloaderQueue( + private static TaskQueue createWorldStateDownloaderQueue( final Path dataDirectory, final MetricsSystem metricsSystem) { - final BytesQueue bytesQueue = RocksDbQueue.create(dataDirectory, metricsSystem); - return new BytesQueueAdapter<>( + final BytesTaskQueue bytesQueue = RocksDbTaskQueue.create(dataDirectory, metricsSystem); + return new BytesTaskQueueAdapter<>( bytesQueue, NodeDataRequest::serialize, NodeDataRequest::deserialize); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index 77316c1bdb..2a5dc1695e 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -26,7 +26,8 @@ import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.OperationTimer; -import tech.pegasys.pantheon.services.queue.BigQueue; +import tech.pegasys.pantheon.services.queue.TaskQueue; +import tech.pegasys.pantheon.services.queue.TaskQueue.Task; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.time.Duration; @@ -55,7 +56,7 @@ private enum Status { } private final EthContext ethContext; - private final BigQueue pendingRequests; + private final TaskQueue pendingRequests; private final int hashCountPerRequest; private final int maxOutstandingRequests; private final AtomicInteger outstandingRequests = new AtomicInteger(0); @@ -69,7 +70,7 @@ private enum Status { public WorldStateDownloader( final EthContext ethContext, final WorldStateStorage worldStateStorage, - final BigQueue pendingRequests, + final TaskQueue pendingRequests, final int hashCountPerRequest, final int maxOutstandingRequests, final LabelledMetric ethTasksTimer, @@ -140,20 +141,22 @@ private void requestNodeData(final BlockHeader header) { EthPeer peer = maybePeer.get(); // Collect data to be requested - List toRequest = new ArrayList<>(); + List> toRequest = new ArrayList<>(); while (toRequest.size() < hashCountPerRequest) { - NodeDataRequest pendingRequest = pendingRequests.dequeue(); - if (pendingRequest == null) { + Task pendingRequestTask = pendingRequests.dequeue(); + if (pendingRequestTask == null) { break; } + NodeDataRequest pendingRequest = pendingRequestTask.getData(); final Optional existingData = pendingRequest.getExistingData(worldStateStorage); if (existingData.isPresent()) { pendingRequest.setData(existingData.get()); queueChildRequests(pendingRequest); + pendingRequestTask.markCompleted(); continue; } - toRequest.add(pendingRequest); + toRequest.add(pendingRequestTask); } // Request and process node data @@ -161,7 +164,8 @@ private void requestNodeData(final BlockHeader header) { sendAndProcessRequests(peer, toRequest, header) .whenComplete( (res, error) -> { - if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) { + if (outstandingRequests.decrementAndGet() == 0 + && pendingRequests.allTasksCompleted()) { // We're done final Updater updater = worldStateStorage.updater(); updater.putAccountStateTrieNode(header.getStateRoot(), rootNode); @@ -201,9 +205,15 @@ private CompletableFuture waitForNewPeer() { } private CompletableFuture sendAndProcessRequests( - final EthPeer peer, final List requests, final BlockHeader blockHeader) { + final EthPeer peer, + final List> requestTasks, + final BlockHeader blockHeader) { List hashes = - requests.stream().map(NodeDataRequest::getHash).distinct().collect(Collectors.toList()); + requestTasks.stream() + .map(Task::getData) + .map(NodeDataRequest::getHash) + .distinct() + .collect(Collectors.toList()); return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer) .assignPeer(peer) .run() @@ -213,11 +223,12 @@ private CompletableFuture sendAndProcessRequests( (data, err) -> { boolean requestFailed = err != null; Updater storageUpdater = worldStateStorage.updater(); - for (NodeDataRequest request : requests) { + for (Task task : requestTasks) { + NodeDataRequest request = task.getData(); BytesValue matchingData = requestFailed ? null : data.get(request.getHash()); if (matchingData == null) { retriedRequestsTotal.inc(); - pendingRequests.enqueue(request); + task.markFailed(); } else { completedRequestsCounter.inc(); // Persist request data @@ -229,6 +240,7 @@ private CompletableFuture sendAndProcessRequests( } queueChildRequests(request); + task.markCompleted(); } } storageUpdater.commit(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index d85d76e1a6..12e4d5b7a0 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -44,8 +44,8 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; -import tech.pegasys.pantheon.services.queue.BigQueue; -import tech.pegasys.pantheon.services.queue.InMemoryBigQueue; +import tech.pegasys.pantheon.services.queue.InMemoryTaskQueue; +import tech.pegasys.pantheon.services.queue.TaskQueue; import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; @@ -117,7 +117,7 @@ public void downloadEmptyWorldState() { .limit(5) .collect(Collectors.toList()); - BigQueue queue = new InMemoryBigQueue<>(); + TaskQueue queue = new InMemoryTaskQueue<>(); WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); WorldStateDownloader downloader = @@ -164,7 +164,7 @@ public void downloadAlreadyAvailableWorldState() { .limit(5) .collect(Collectors.toList()); - BigQueue queue = new InMemoryBigQueue<>(); + TaskQueue queue = new InMemoryTaskQueue<>(); WorldStateDownloader downloader = new WorldStateDownloader( ethProtocolManager.ethContext(), @@ -210,7 +210,7 @@ public void canRecoverFromTimeouts() { .limit(5) .collect(Collectors.toList()); - BigQueue queue = new InMemoryBigQueue<>(); + TaskQueue queue = new InMemoryTaskQueue<>(); WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); WorldStateDownloader downloader = @@ -271,7 +271,7 @@ public void doesNotRequestKnownCodeFromNetwork() { .limit(5) .collect(Collectors.toList()); - BigQueue queue = new InMemoryBigQueue<>(); + TaskQueue queue = new InMemoryTaskQueue<>(); WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); @@ -349,7 +349,7 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { .limit(5) .collect(Collectors.toList()); - BigQueue queue = new InMemoryBigQueue<>(); + TaskQueue queue = new InMemoryTaskQueue<>(); WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); @@ -441,7 +441,7 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { .limit(5) .collect(Collectors.toList()); - BigQueue queue = new InMemoryBigQueue<>(); + TaskQueue queue = new InMemoryTaskQueue<>(); WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); @@ -606,7 +606,7 @@ private void downloadAvailableWorldStateFromPeers( .getHeader(); assertThat(otherStateRoot).isNotEqualTo(stateRoot); // Sanity check - BigQueue queue = new InMemoryBigQueue<>(); + TaskQueue queue = new InMemoryTaskQueue<>(); WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BigQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BigQueue.java deleted file mode 100644 index 793669a8fb..0000000000 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BigQueue.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2018 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.services.queue; - -import java.io.Closeable; - -/** - * Represents a very large thread-safe queue that may exceed memory limits. - * - * @param the type of data held in the queue - */ -public interface BigQueue extends Closeable { - - void enqueue(T value); - - T dequeue(); - - long size(); - - default boolean isEmpty() { - return size() == 0; - } -} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueue.java similarity index 91% rename from services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueue.java rename to services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueue.java index 6abaaf55e0..6039bb0de4 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueue.java +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueue.java @@ -14,4 +14,4 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; -public interface BytesQueue extends BigQueue {} +public interface BytesTaskQueue extends TaskQueue {} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueueAdapter.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapter.java similarity index 51% rename from services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueueAdapter.java rename to services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapter.java index b8c41f8fed..55cf408a6a 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueueAdapter.java +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapter.java @@ -17,14 +17,14 @@ import java.io.IOException; import java.util.function.Function; -public class BytesQueueAdapter implements BigQueue { +public class BytesTaskQueueAdapter implements TaskQueue { - private final BytesQueue queue; + private final BytesTaskQueue queue; private final Function serializer; private final Function deserializer; - public BytesQueueAdapter( - final BytesQueue queue, + public BytesTaskQueueAdapter( + final BytesTaskQueue queue, final Function serializer, final Function deserializer) { this.queue = queue; @@ -33,14 +33,19 @@ public BytesQueueAdapter( } @Override - public void enqueue(final T value) { - queue.enqueue(serializer.apply(value)); + public void enqueue(final T taskData) { + queue.enqueue(serializer.apply(taskData)); } @Override - public T dequeue() { - BytesValue value = queue.dequeue(); - return value == null ? null : deserializer.apply(value); + public Task dequeue() { + Task task = queue.dequeue(); + if (task == null) { + return null; + } + + T data = deserializer.apply(task.getData()); + return new AdapterTask<>(task, data); } @Override @@ -48,8 +53,43 @@ public long size() { return queue.size(); } + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public boolean allTasksCompleted() { + return queue.allTasksCompleted(); + } + @Override public void close() throws IOException { queue.close(); } + + private static class AdapterTask implements Task { + private final Task wrappedTask; + private final T data; + + public AdapterTask(final Task wrappedTask, final T data) { + this.wrappedTask = wrappedTask; + this.data = data; + } + + @Override + public T getData() { + return data; + } + + @Override + public void markCompleted() { + wrappedTask.markCompleted(); + } + + @Override + public void markFailed() { + wrappedTask.markFailed(); + } + } } diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueue.java new file mode 100644 index 0000000000..9236c1c2f6 --- /dev/null +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueue.java @@ -0,0 +1,110 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class InMemoryTaskQueue implements TaskQueue { + private final Queue internalQueue = new ArrayDeque<>(); + private final AtomicInteger unfinishedOutstandingTasks = new AtomicInteger(0); + private final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public synchronized void enqueue(final T taskData) { + assertNotClosed(); + internalQueue.add(taskData); + } + + @Override + public synchronized Task dequeue() { + assertNotClosed(); + T data = internalQueue.poll(); + if (data == null) { + return null; + } + unfinishedOutstandingTasks.incrementAndGet(); + return new InMemoryTask<>(this, data); + } + + @Override + public synchronized long size() { + assertNotClosed(); + return internalQueue.size(); + } + + @Override + public synchronized boolean isEmpty() { + assertNotClosed(); + return size() == 0; + } + + @Override + public boolean allTasksCompleted() { + assertNotClosed(); + return isEmpty() && unfinishedOutstandingTasks.get() == 0; + } + + @Override + public synchronized void close() { + closed.set(true); + internalQueue.clear(); + } + + private void assertNotClosed() { + if (closed.get()) { + throw new IllegalStateException("Attempt to access closed " + getClass().getSimpleName()); + } + } + + private synchronized void handleFailedTask(final InMemoryTask task) { + enqueue(task.getData()); + markTaskCompleted(); + } + + private synchronized void markTaskCompleted() { + unfinishedOutstandingTasks.decrementAndGet(); + } + + private static class InMemoryTask implements Task { + private final T data; + private final InMemoryTaskQueue queue; + private final AtomicBoolean completed = new AtomicBoolean(false); + + public InMemoryTask(final InMemoryTaskQueue queue, final T data) { + this.queue = queue; + this.data = data; + } + + @Override + public T getData() { + return data; + } + + @Override + public void markCompleted() { + if (completed.compareAndSet(false, true)) { + queue.markTaskCompleted(); + } + } + + @Override + public void markFailed() { + if (completed.compareAndSet(false, true)) { + queue.handleFailedTask(this); + } + } + } +} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java similarity index 52% rename from services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java rename to services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java index 36643a46b1..770a31e70e 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java @@ -19,6 +19,9 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.nio.file.Path; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -27,19 +30,22 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -public class RocksDbQueue implements BytesQueue { +public class RocksDbTaskQueue implements BytesTaskQueue { private final Options options; private final RocksDB db; private final AtomicLong lastEnqueuedKey = new AtomicLong(0); private final AtomicLong lastDequeuedKey = new AtomicLong(0); + private final AtomicLong oldestKey = new AtomicLong(0); + private final Set outstandingTasks = new HashSet<>(); + private final AtomicBoolean closed = new AtomicBoolean(false); private final OperationTimer enqueueLatency; private final OperationTimer dequeueLatency; - private RocksDbQueue(final Path storageDirectory, final MetricsSystem metricsSystem) { + private RocksDbTaskQueue(final Path storageDirectory, final MetricsSystem metricsSystem) { try { RocksDbUtil.loadNativeLibrary(); options = new Options().setCreateIfMissing(true); @@ -60,37 +66,39 @@ private RocksDbQueue(final Path storageDirectory, final MetricsSystem metricsSys } } - public static RocksDbQueue create( + public static RocksDbTaskQueue create( final Path storageDirectory, final MetricsSystem metricsSystem) { - return new RocksDbQueue(storageDirectory, metricsSystem); + return new RocksDbTaskQueue(storageDirectory, metricsSystem); } @Override - public synchronized void enqueue(final BytesValue value) { + public synchronized void enqueue(final BytesValue taskData) { assertNotClosed(); try (final OperationTimer.TimingContext ignored = enqueueLatency.startTimer()) { byte[] key = Longs.toByteArray(lastEnqueuedKey.incrementAndGet()); - db.put(key, value.getArrayUnsafe()); + db.put(key, taskData.getArrayUnsafe()); } catch (RocksDBException e) { throw new StorageException(e); } } @Override - public synchronized BytesValue dequeue() { + public synchronized Task dequeue() { assertNotClosed(); - if (size() == 0) { + if (isEmpty()) { return null; } try (final OperationTimer.TimingContext ignored = dequeueLatency.startTimer()) { - byte[] key = Longs.toByteArray(lastDequeuedKey.incrementAndGet()); - byte[] value = db.get(key); + long key = lastDequeuedKey.incrementAndGet(); + byte[] value = db.get(Longs.toByteArray(key)); if (value == null) { throw new IllegalStateException("Next expected value is missing"); } - db.delete(key); - return BytesValue.of(value); + BytesValue data = BytesValue.of(value); + RocksDbTask task = new RocksDbTask(this, data, key); + outstandingTasks.add(task); + return task; } catch (RocksDBException e) { throw new StorageException(e); } @@ -102,6 +110,36 @@ public synchronized long size() { return lastEnqueuedKey.get() - lastDequeuedKey.get(); } + @Override + public synchronized boolean isEmpty() { + return size() == 0; + } + + @Override + public synchronized boolean allTasksCompleted() { + return isEmpty() && outstandingTasks.isEmpty(); + } + + private synchronized void deleteCompletedTasks() { + long oldestOutstandingKey = + outstandingTasks.stream() + .min(Comparator.comparingLong(RocksDbTask::getKey)) + .map(RocksDbTask::getKey) + .orElse(lastDequeuedKey.get() + 1); + + if (oldestKey.get() < oldestOutstandingKey) { + // Delete all contiguous completed task keys + byte[] fromKey = Longs.toByteArray(oldestKey.get()); + byte[] toKey = Longs.toByteArray(oldestOutstandingKey); + try { + db.deleteRange(fromKey, toKey); + oldestKey.set(oldestOutstandingKey); + } catch (RocksDBException e) { + throw new StorageException(e); + } + } + } + @Override public void close() { if (closed.compareAndSet(false, true)) { @@ -112,14 +150,59 @@ public void close() { private void assertNotClosed() { if (closed.get()) { - throw new IllegalStateException( - "Attempt to access closed " + RocksDbQueue.class.getSimpleName()); + throw new IllegalStateException("Attempt to access closed " + getClass().getSimpleName()); } } + private synchronized void markTaskCompleted(final RocksDbTask task) { + outstandingTasks.remove(task); + deleteCompletedTasks(); + } + + private synchronized void handleFailedTask(final RocksDbTask task) { + enqueue(task.getData()); + markTaskCompleted(task); + } + public static class StorageException extends RuntimeException { StorageException(final Throwable t) { super(t); } } + + private static class RocksDbTask implements Task { + private final AtomicBoolean completed = new AtomicBoolean(false); + private final RocksDbTaskQueue parentQueue; + private final BytesValue data; + private final long key; + + private RocksDbTask(final RocksDbTaskQueue parentQueue, final BytesValue data, final long key) { + this.parentQueue = parentQueue; + this.data = data; + this.key = key; + } + + public long getKey() { + return key; + } + + @Override + public BytesValue getData() { + return data; + } + + @Override + public void markCompleted() { + if (completed.compareAndSet(false, true)) { + parentQueue.markTaskCompleted(this); + } + } + + @Override + public void markFailed() { + if (completed.compareAndSet(false, true)) { + parentQueue.handleFailedTask(this); + } + } + } } diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/TaskQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/TaskQueue.java new file mode 100644 index 0000000000..c19f204185 --- /dev/null +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/TaskQueue.java @@ -0,0 +1,57 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import java.io.Closeable; + +/** + * Represents a very large thread-safe task queue that may exceed memory limits. + * + * @param the type of data held in the queue + */ +public interface TaskQueue extends Closeable { + + /** + * Enqueue some data for processing. + * + * @param taskData The data to be processed. + */ + void enqueue(T taskData); + + /** + * Dequeue a task for processing. This task will be tracked as a pending task until either {@code + * Task.markCompleted} or {@code Task.requeue} is called. + * + * @return The task to be processed. + */ + Task dequeue(); + + /** @return The number of tasks in the queue. */ + long size(); + + /** @return True if all tasks have been dequeued. */ + boolean isEmpty(); + + /** @return True if all tasks have been dequeued and processed. */ + boolean allTasksCompleted(); + + interface Task { + T getData(); + + /** Mark this task as completed. */ + void markCompleted(); + + /** Mark this task as failed and requeue. */ + void markFailed(); + } +} diff --git a/services/queue/src/test/tech/pegasys/pantheon/services/queue/AbstractBigQueueTest.java b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/AbstractTaskQueueTest.java similarity index 55% rename from services/queue/src/test/tech/pegasys/pantheon/services/queue/AbstractBigQueueTest.java rename to services/queue/src/test/java/tech/pegasys/pantheon/services/queue/AbstractTaskQueueTest.java index 0d8ba296fd..f2aca7e50e 100644 --- a/services/queue/src/test/tech/pegasys/pantheon/services/queue/AbstractBigQueueTest.java +++ b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/AbstractTaskQueueTest.java @@ -14,6 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import tech.pegasys.pantheon.services.queue.TaskQueue.Task; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; @@ -24,7 +25,7 @@ import org.junit.Test; -abstract class AbstractBigQueueTest> { +abstract class AbstractTaskQueueTest> { protected abstract T createQueue() throws Exception; @@ -39,16 +40,76 @@ public void enqueueAndDequeue() throws Exception { queue.enqueue(one); queue.enqueue(two); - assertThat(queue.dequeue()).isEqualTo(one); + assertThat(queue.dequeue().getData()).isEqualTo(one); queue.enqueue(three); - assertThat(queue.dequeue()).isEqualTo(two); - assertThat(queue.dequeue()).isEqualTo(three); + assertThat(queue.dequeue().getData()).isEqualTo(two); + assertThat(queue.dequeue().getData()).isEqualTo(three); assertThat(queue.dequeue()).isNull(); assertThat(queue.dequeue()).isNull(); queue.enqueue(three); - assertThat(queue.dequeue()).isEqualTo(three); + assertThat(queue.dequeue().getData()).isEqualTo(three); + } + } + + @Test + public void markTaskFailed() throws Exception { + try (T queue = createQueue()) { + BytesValue value = BytesValue.of(1); + + assertThat(queue.isEmpty()).isTrue(); + assertThat(queue.allTasksCompleted()).isTrue(); + + queue.enqueue(value); + + assertThat(queue.isEmpty()).isFalse(); + assertThat(queue.allTasksCompleted()).isFalse(); + + Task task = queue.dequeue(); + assertThat(task).isNotNull(); + assertThat(task.getData()).isEqualTo(value); + assertThat(queue.isEmpty()).isTrue(); + assertThat(queue.allTasksCompleted()).isFalse(); + + task.markFailed(); + assertThat(queue.isEmpty()).isFalse(); + assertThat(queue.allTasksCompleted()).isFalse(); + + // Subsequent mark completed should do nothing + task.markCompleted(); + assertThat(queue.isEmpty()).isFalse(); + assertThat(queue.allTasksCompleted()).isFalse(); + } + } + + @Test + public void markTaskCompleted() throws Exception { + try (T queue = createQueue()) { + BytesValue value = BytesValue.of(1); + + assertThat(queue.isEmpty()).isTrue(); + assertThat(queue.allTasksCompleted()).isTrue(); + + queue.enqueue(value); + + assertThat(queue.isEmpty()).isFalse(); + assertThat(queue.allTasksCompleted()).isFalse(); + + Task task = queue.dequeue(); + assertThat(task).isNotNull(); + assertThat(task.getData()).isEqualTo(value); + assertThat(queue.isEmpty()).isTrue(); + assertThat(queue.allTasksCompleted()).isFalse(); + + task.markCompleted(); + assertThat(queue.isEmpty()).isTrue(); + assertThat(queue.allTasksCompleted()).isTrue(); + + // Subsequent mark failed should do nothing + task.markFailed(); + assertThat(queue.isEmpty()).isTrue(); + assertThat(queue.allTasksCompleted()).isTrue(); } } @@ -62,13 +123,14 @@ public void handlesConcurrentQueuing() throws Exception { final CountDownLatch queuingFinished = new CountDownLatch(threadCount); // Start thread for reading values - List dequeued = new ArrayList<>(); + List> dequeued = new ArrayList<>(); Thread reader = new Thread( () -> { while (queuingFinished.getCount() > 0 || !queue.isEmpty()) { if (!queue.isEmpty()) { - BytesValue value = queue.dequeue(); + Task value = queue.dequeue(); + value.markCompleted(); dequeued.add(value); } } diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryBigQueue.java b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapterTest.java similarity index 50% rename from services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryBigQueue.java rename to services/queue/src/test/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapterTest.java index b40b4ea186..40f055d6dd 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryBigQueue.java +++ b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapterTest.java @@ -12,29 +12,22 @@ */ package tech.pegasys.pantheon.services.queue; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.util.bytes.BytesValue; -public class InMemoryBigQueue implements BigQueue { - private final Queue internalQueue = new ConcurrentLinkedQueue<>(); +import java.util.function.Function; - @Override - public void enqueue(final T value) { - internalQueue.add(value); - } +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; - @Override - public T dequeue() { - return internalQueue.poll(); - } +public class BytesTaskQueueAdapterTest extends AbstractTaskQueueTest> { - @Override - public long size() { - return internalQueue.size(); - } + @Rule public final TemporaryFolder folder = new TemporaryFolder(); @Override - public void close() { - internalQueue.clear(); + protected TaskQueue createQueue() throws Exception { + BytesTaskQueue queue = + RocksDbTaskQueue.create(folder.newFolder().toPath(), new NoOpMetricsSystem()); + return new BytesTaskQueueAdapter<>(queue, Function.identity(), Function.identity()); } } diff --git a/services/queue/src/test/tech/pegasys/pantheon/services/queue/InMemoryBigQueueTest.java b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueueTest.java similarity index 76% rename from services/queue/src/test/tech/pegasys/pantheon/services/queue/InMemoryBigQueueTest.java rename to services/queue/src/test/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueueTest.java index 3d4c2fb626..f7994ef1b3 100644 --- a/services/queue/src/test/tech/pegasys/pantheon/services/queue/InMemoryBigQueueTest.java +++ b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueueTest.java @@ -14,10 +14,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; -public class InMemoryBigQueueTest extends AbstractBigQueueTest> { +public class InMemoryTaskQueueTest extends AbstractTaskQueueTest> { @Override - protected InMemoryBigQueue createQueue() throws Exception { - return new InMemoryBigQueue<>(); + protected InMemoryTaskQueue createQueue() throws Exception { + return new InMemoryTaskQueue<>(); } } diff --git a/services/queue/src/test/tech/pegasys/pantheon/services/queue/RocksDbQueueTest.java b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java similarity index 78% rename from services/queue/src/test/tech/pegasys/pantheon/services/queue/RocksDbQueueTest.java rename to services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java index 1fd25ae239..bf3b5c65de 100644 --- a/services/queue/src/test/tech/pegasys/pantheon/services/queue/RocksDbQueueTest.java +++ b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java @@ -19,12 +19,12 @@ import org.junit.Rule; import org.junit.rules.TemporaryFolder; -public class RocksDbQueueTest extends AbstractBigQueueTest { +public class RocksDbTaskQueueTest extends AbstractTaskQueueTest { @Rule public final TemporaryFolder folder = new TemporaryFolder(); @Override - protected RocksDbQueue createQueue() throws IOException { - return RocksDbQueue.create(folder.newFolder().toPath(), new NoOpMetricsSystem()); + protected RocksDbTaskQueue createQueue() throws IOException { + return RocksDbTaskQueue.create(folder.newFolder().toPath(), new NoOpMetricsSystem()); } }