From a48110eba7caf60a48bad660f7adc33e3680db88 Mon Sep 17 00:00:00 2001 From: Artem Ananev Date: Tue, 21 Jan 2025 12:21:40 -0800 Subject: [PATCH] fix: 17467: Back out changes for 15448 from release 0.58 Signed-off-by: Artem Ananev --- .../swirlds/benchmark/VirtualMapBench.java | 64 +--- .../VirtualMapSerializationTests.java | 88 +----- .../internal/cache/VirtualNodeCache.java | 295 +++-------------- .../internal/merkle/VirtualRootNode.java | 40 +-- .../internal/pipeline/VirtualPipeline.java | 24 +- .../internal/pipeline/VirtualRoot.java | 6 +- .../internal/cache/VirtualNodeCacheTest.java | 144 ++------- .../internal/merkle/VirtualRootNodeTest.java | 298 ------------------ .../internal/pipeline/DummyVirtualRoot.java | 6 +- .../internal/pipeline/NoOpVirtualRoot.java | 6 +- .../pipeline/VirtualPipelineTests.java | 6 +- 11 files changed, 100 insertions(+), 877 deletions(-) diff --git a/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/VirtualMapBench.java b/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/VirtualMapBench.java index a8dba8d84a15..3b03aeafaaa8 100644 --- a/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/VirtualMapBench.java +++ b/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/VirtualMapBench.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2022-2024 Hedera Hashgraph, LLC + * Copyright (C) 2022-2025 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -261,66 +261,4 @@ public void read() throws Exception { afterTest(true); } - - @Benchmark - public void queueMode() throws Exception { - beforeTest("queueMode"); - - final long[] map = new long[verify ? maxKey : 0]; - VirtualMap virtualMap = createMap(map); - - final int roundsPerCopy = maxKey / numFiles; - for (int i = 0; i < maxKey; i++) { - // Add - int index = i; - final BenchmarkKey keyToAdd = new BenchmarkKey(index); - long val = nextValue(); - virtualMap.put(keyToAdd, new BenchmarkValue(val)); - if (verify) { - map[index] = val; - } - // Update - if (i >= numRecords / 2) { - index = i - numRecords / 2; - final BenchmarkKey keyToUpdate = new BenchmarkKey(index); - val = nextValue(); - virtualMap.put(keyToUpdate, new BenchmarkValue(val)); - if (verify) { - map[index] = val; - } - } - // Remove - if (i >= numRecords) { - index = i - numRecords; - final BenchmarkKey keyToRemove = new BenchmarkKey(index); - virtualMap.remove(keyToRemove); - if (verify) { - map[index] = 0; - } - } - - if (i % roundsPerCopy == 0) { - virtualMap = copyMap(virtualMap); - } - } - - // Ensure the map is done with hashing/merging/flushing - final var finalMap = flushMap(virtualMap); - - verifyMap(map, finalMap); - - afterTest(true, () -> { - finalMap.release(); - finalMap.getDataSource().close(); - }); - } - - public static void main(String[] args) throws Exception { - final VirtualMapBench bench = new VirtualMapBench(); - bench.setup(); - bench.beforeTest(); - bench.queueMode(); - bench.afterTest(); - bench.destroy(); - } } diff --git a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/VirtualMapSerializationTests.java b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/VirtualMapSerializationTests.java index e512190f00d0..f2af987d0fdf 100644 --- a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/VirtualMapSerializationTests.java +++ b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/VirtualMapSerializationTests.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2021-2024 Hedera Hashgraph, LLC + * Copyright (C) 2021-2025 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.swirlds.common.config.singleton.ConfigurationHolder; import com.swirlds.common.constructable.ClassConstructorPair; import com.swirlds.common.constructable.ConstructableRegistry; import com.swirlds.common.constructable.ConstructableRegistryException; @@ -39,13 +38,11 @@ import com.swirlds.common.merkle.crypto.MerkleCryptoFactory; import com.swirlds.common.merkle.route.MerkleRoute; import com.swirlds.config.api.Configuration; -import com.swirlds.config.extensions.test.fixtures.TestConfigBuilder; import com.swirlds.merkledb.test.fixtures.ExampleFixedSizeVirtualValue; import com.swirlds.merkledb.test.fixtures.ExampleFixedSizeVirtualValueSerializer; import com.swirlds.merkledb.test.fixtures.ExampleLongKeyFixedSize; import com.swirlds.virtualmap.VirtualMap; import com.swirlds.virtualmap.config.VirtualMapConfig; -import com.swirlds.virtualmap.config.VirtualMapConfig_; import com.swirlds.virtualmap.datasource.VirtualDataSourceBuilder; import com.swirlds.virtualmap.internal.cache.VirtualNodeCache; import com.swirlds.virtualmap.internal.merkle.VirtualInternalNode; @@ -379,87 +376,4 @@ void serializeFlushedAndUnflushedData(final int count) throws InterruptedExcepti MILLISECONDS.sleep(100); // Hack. Release methods may not have finished their work yet. } - - @Test - void inMemoryModeSerde() throws IOException { - final Configuration configuration = new TestConfigBuilder() - .withValue(VirtualMapConfig_.COPY_FLUSH_THRESHOLD, 1_000_000) - .getOrCreateConfig(); - ConfigurationHolder.getInstance().setConfiguration(configuration); - - VirtualMap map = new VirtualMap<>( - "inMemoryModeSerde", KEY_SERIALIZER, VALUE_SERIALIZER, constructBuilder(configuration), configuration); - - // Copy 0 - for (int i = 0; i < 100; i++) { - final ExampleLongKeyFixedSize key = new ExampleLongKeyFixedSize(i); - final ExampleFixedSizeVirtualValue value = new ExampleFixedSizeVirtualValue(1000000 + i); - map.put(key, value); - } - - // Copy 1 - final VirtualMap copy1 = map.copy(); - map.release(); - map = copy1; - for (int i = 100; i < 200; i++) { - final ExampleLongKeyFixedSize key = new ExampleLongKeyFixedSize(i); - final ExampleFixedSizeVirtualValue value = new ExampleFixedSizeVirtualValue(1000000 + i); - map.put(key, value); - } - // Add more entries to copy 1 to force it to flush - for (int i = 100000; i < 120000; i++) { - final ExampleLongKeyFixedSize key = new ExampleLongKeyFixedSize(i); - final ExampleFixedSizeVirtualValue value = new ExampleFixedSizeVirtualValue(1000000 + i); - map.put(key, value); - } - - final int nCopies = 100; - for (int copyNo = 2; copyNo < nCopies; copyNo++) { - final VirtualMap copy = map.copy(); - map.release(); - map = copy; - for (int i = 0; i < 100; i++) { - final int toAdd = copyNo * 100 + i; - final ExampleLongKeyFixedSize keyToAdd = new ExampleLongKeyFixedSize(toAdd); - final ExampleFixedSizeVirtualValue value = new ExampleFixedSizeVirtualValue(1000000 + toAdd); - map.put(keyToAdd, value); - final int toRemove = (copyNo - 2) * 100 + i + 75; - final ExampleLongKeyFixedSize keytoRemove = new ExampleLongKeyFixedSize(toRemove); - final ExampleFixedSizeVirtualValue removed = map.remove(keytoRemove); - assertNotNull(removed); - } - } - - // Final copy - final VirtualMap copyF = map.copy(); - map.release(); - map = copyF; - - // And one more to make sure copyF is immutable and can be serialized - final VirtualMap copyOneMore = map.copy(); - - final Hash originalHash = MerkleCryptoFactory.getInstance().digestTreeSync(copyF); - - final ByteArrayOutputStream bout = new ByteArrayOutputStream(); - final Path tmp = LegacyTemporaryFileBuilder.buildTemporaryDirectory("inMemoryModeSerde", configuration); - try (final SerializableDataOutputStream out = new SerializableDataOutputStream(bout)) { - copyF.serialize(out, tmp); - } - - MerkleDb.resetDefaultInstancePath(); - - final ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); - map = new VirtualMap<>(configuration); - try (final SerializableDataInputStream in = new SerializableDataInputStream(bin)) { - map.deserialize(in, tmp, 3); - } - - final VirtualMap copyAfter = map.copy(); - - final Hash restoredHash = MerkleCryptoFactory.getInstance().digestTreeSync(map); - assertEquals(originalHash, restoredHash); - - copyOneMore.release(); - copyAfter.release(); - } } diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java index 1a8e3b5b67c4..5dc563b65a68 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java @@ -245,15 +245,6 @@ private static synchronized Executor getCleaningPool(@NonNull final VirtualMapCo */ private final AtomicBoolean released = new AtomicBoolean(false); - /** - * Indicates whether this cache has been prepared for flush by calling its {@link - * #prepareForFlush()} method. In most cases counters like {@link #filteredLeavesCount}, - * {@link #filteredLeafPathsCount}, and {@link #filteredHashesCount} are expected - * to be greater than zero after the method is called, but sometimes they are all - * zeroes, and therefore a separate flag is used. - */ - private final AtomicBoolean preparedForFlush = new AtomicBoolean(false); - /** * Whether the leaf indexes in this cache are immutable. We track * immutability of leaves and internal nodes separately, because leaves are only @@ -282,13 +273,6 @@ private static synchronized Executor getCleaningPool(@NonNull final VirtualMapCo */ private volatile ConcurrentArray>> dirtyLeaves = new ConcurrentArray<>(); - /** - * When the cache is filtered using {@link #filterMutations(ConcurrentArray, Map, long, long, AtomicLong, VirtualMapConfig)}, - * this field stores the number of filtered mutations in {@link #dirtyLeaves}. This number - * affects cache's estimated size. - */ - private final AtomicLong filteredLeavesCount = new AtomicLong(0); - /** * A set of leaf path changes that occurred in this version of the cache. This is separate * from dirtyLeaves because dirtyLeaves captures the history of changes to leaves, while @@ -299,13 +283,6 @@ private static synchronized Executor getCleaningPool(@NonNull final VirtualMapCo */ private volatile ConcurrentArray> dirtyLeafPaths = new ConcurrentArray<>(); - /** - * When the cache is filtered using {@link #filterMutations(ConcurrentArray, Map, long, long, AtomicLong, VirtualMapConfig)}, - * this field stores the number of filtered mutations in {@link #dirtyLeafPaths}. This number - * affects cache's estimated size. - */ - private final AtomicLong filteredLeafPathsCount = new AtomicLong(0); - /** * A set of all modifications to node hashes that occurred in this version of the cache. * We use a list as an optimization, but it requires us to filter out mutations for the @@ -316,13 +293,6 @@ private static synchronized Executor getCleaningPool(@NonNull final VirtualMapCo */ private volatile ConcurrentArray> dirtyHashes = new ConcurrentArray<>(); - /** - * When the cache is filtered using {@link #filterMutations(ConcurrentArray, Map, long, long, AtomicLong, VirtualMapConfig)}, - * this field stores the number of filtered mutations in {@link #dirtyHashes}. This number - * affects cache's estimated size. - */ - private final AtomicLong filteredHashesCount = new AtomicLong(0); - /** * Indicates if this virtual cache instance contains mutations from older cache versions * as a result of cache merge operation. @@ -580,10 +550,6 @@ public void seal() { // most recent copy. The query APIs can be called from any thread. // -------------------------------------------------------------------------------------------- - public VirtualLeafRecord putLeaf(final VirtualLeafRecord leaf) { - return putLeaf(leaf, false); - } - /** * Puts a leaf into the cache. Called whenever there is a new leaf or * whenever the value of the leaf has changed. Note that the caller is @@ -603,15 +569,12 @@ public VirtualLeafRecord putLeaf(final VirtualLeafRecord leaf) { * * @param leaf * The leaf to put. Must not be null. Must have the correct key and path. - * @param newRecord - * Indicates if this leaf update is to insert a new entity rather than to update - * an existing one * @throws NullPointerException * if the leaf is null * @throws MutabilityException * if the cache is immutable for leaf changes */ - public VirtualLeafRecord putLeaf(final VirtualLeafRecord leaf, boolean newRecord) { + public VirtualLeafRecord putLeaf(final VirtualLeafRecord leaf) { throwIfLeafImmutable(); requireNonNull(leaf); @@ -625,7 +588,7 @@ public VirtualLeafRecord putLeaf(final VirtualLeafRecord leaf, boole // Get the first data element (mutation) in the list based on the key, // and then create or update the associated mutation. - return keyToDirtyLeafIndex.compute(key, (k, mutations) -> mutate(leaf, newRecord, mutations)).value; + return keyToDirtyLeafIndex.compute(key, (k, mutations) -> mutate(leaf, mutations)).value; } /** @@ -654,7 +617,7 @@ public void deleteLeaf(final VirtualLeafRecord leaf) { final K key = leaf.getKey(); assert key != null : "Keys cannot be null"; keyToDirtyLeafIndex.compute(key, (k, mutations) -> { - mutations = mutate(leaf, false, mutations); + mutations = mutate(leaf, mutations); mutations.setDeleted(true); assert pathToDirtyLeafIndex.get(leaf.getPath()).isDeleted() : "It should be deleted too"; return mutations; @@ -811,9 +774,7 @@ public Stream> dirtyLeavesForHash(final long firstLeafPa if (mergedCopy.get()) { throw new IllegalStateException("Cannot get dirty leaves for hashing on a merged cache copy"); } - // This method is called on a cache copy, which is not a result of merging older - // copies. There is no need to filter mutations here - final Stream> result = dirtyLeaves(firstLeafPath, lastLeafPath); + final Stream> result = dirtyLeaves(firstLeafPath, lastLeafPath, false); return result.sorted(Comparator.comparingLong(VirtualLeafRecord::getPath)); } @@ -821,8 +782,6 @@ public Stream> dirtyLeavesForHash(final long firstLeafPa * Returns a stream of dirty leaves from this cache instance to flush this virtual map copy and all * previous copies merged into this one to disk. * - *

{@link #prepareForFlush()} must be called before this method. - * * @param firstLeafPath * The first leaf path to include to the stream * @param lastLeafPath @@ -831,10 +790,7 @@ public Stream> dirtyLeavesForHash(final long firstLeafPa * A stream of dirty leaves for flushes */ public Stream> dirtyLeavesForFlush(final long firstLeafPath, final long lastLeafPath) { - if (!preparedForFlush.get()) { - throw new IllegalStateException("This cache has not been prepared for flush"); - } - return dirtyLeaves(firstLeafPath, lastLeafPath); + return dirtyLeaves(firstLeafPath, lastLeafPath, true); } /** @@ -858,19 +814,29 @@ public Stream> dirtyLeavesForFlush(final long firstLeafP * The last leaf path to receive in the results. It is possible, through merging of multiple rounds, * for the data to have leaf data that is outside the expected range for the {@link VirtualMap} of * this cache. We need to provide the leaf boundaries to compensate for this. + * @param dedupe + * Indicates if the duplicated entries should be removed from the stream * @return A non-null stream of dirty leaves. May be empty. Will not contain duplicate records * @throws MutabilityException if called on a cache that still allows dirty leaves to be added */ - private Stream> dirtyLeaves(final long firstLeafPath, final long lastLeafPath) { + private Stream> dirtyLeaves( + final long firstLeafPath, final long lastLeafPath, final boolean dedupe) { if (!dirtyLeaves.isImmutable()) { throw new MutabilityException("Cannot call on a cache that is still mutable for dirty leaves"); } + if (dedupe) { + // Mark obsolete mutations to filter later + filterMutations(dirtyLeaves, virtualMapConfig); + } return dirtyLeaves.stream() .filter(mutation -> { final long path = mutation.value.getPath(); return path >= firstLeafPath && path <= lastLeafPath; }) - .filter(mutation -> !mutation.isFiltered()) + .filter(mutation -> { + assert dedupe || !mutation.isFiltered(); + return !mutation.isFiltered(); + }) .filter(mutation -> !mutation.isDeleted()) .map(mutation -> mutation.value); } @@ -881,7 +847,7 @@ private Stream> dirtyLeaves(final long firstLeafPath, fi * @return Estimated number of dirty leaf nodes */ public long estimatedDirtyLeavesCount() { - return (dirtyLeaves == null) ? 0 : (dirtyLeaves.size() - filteredLeavesCount.get()); + return (dirtyLeaves == null) ? 0 : dirtyLeaves.size(); } /** @@ -1034,8 +1000,6 @@ public Hash lookupHashByPath(final long path, final boolean forModify) { *

This method may be called concurrently from multiple threads (although in practice, this should * never happen). * - *

{@link #prepareForFlush()} must be called before this method. - * * @param lastLeafPath * The last leaf path at and above which no node results should be returned. It is possible, * through merging of multiple rounds, for the data to have data that is outside the expected range @@ -1047,9 +1011,8 @@ public Stream dirtyHashesForFlush(final long lastLeafPath) { if (!dirtyHashes.isImmutable()) { throw new MutabilityException("Cannot get the dirty internal records for a non-sealed cache."); } - if (!preparedForFlush.get()) { - throw new IllegalStateException("This cache has not been prepared for flush"); - } + // Mark obsolete mutations to filter later + filterMutations(dirtyHashes, virtualMapConfig); return dirtyHashes.stream() .filter(mutation -> mutation.key <= lastLeafPath) .filter(mutation -> !mutation.isFiltered()) @@ -1060,89 +1023,11 @@ public Stream dirtyHashesForFlush(final long lastLeafPath) { /** * Gets estimated number of dirty hashes in this cache. * - *

This method and similar methods about leaves are called multiple times. First, - * the method is used to estimate this cache copy size to make a decision whether to - * flush the copy to disk or to merge it to a newer copy. If the copy is selected to - * flush, its {@link #prepareForFlush()} method is called, it may mark some mutations - * as filtered, which affects the estimated size. After that this method is called - * again, it takes filtered mutations into consideration. If the updated estimated - * size is lower than flush threshold, the copy is not flushed to disk, but rather - * its {@link #garbageCollect()} is called. - * * @return * Estimated number of dirty hashes */ public long estimatedHashesCount() { - return (dirtyHashes == null) ? 0 : (dirtyHashes.size() - filteredHashesCount.get()); - } - - /** - * This cache copy is selected to flush to disk, either because it is explicitly - * marked as such, or when cache copy estimated size exceeds flush threshold. Before - * flush, this method is used to mark redundant mutations as "filtered", so they aren't - * included to the streams for the data source. It may happen that after filtering the - * cache copy is no longer needed to flush. In this case, all filtered mutations are - * removed using {@link #garbageCollect()} method. - * - *

This method can only be called on sealed caches. - * - * @throws MutabilityException if called on a non-sealed cache instance. - */ - public void prepareForFlush() { - if (!hashesAreImmutable.get() || !leafIndexesAreImmutable.get()) { - throw new MutabilityException("Cannot prepare for flushing for a non-sealed cache"); - } - if (preparedForFlush.getAndSet(true)) { - throw new IllegalStateException("This cache has been already prepared for flush"); - } - // Mark obsolete mutations to filter later and update "filtered" counters. These - // counters will affect the estimated size - final long version = getFastCopyVersion(); - final long lastReleasedVersion = lastReleased.get(); - filterMutations( - dirtyHashes, pathToDirtyHashIndex, version, lastReleasedVersion, filteredHashesCount, virtualMapConfig); - filterMutations( - dirtyLeafPaths, - pathToDirtyLeafIndex, - version, - lastReleasedVersion, - filteredLeafPathsCount, - virtualMapConfig); - filterMutations( - dirtyLeaves, keyToDirtyLeafIndex, version, lastReleasedVersion, filteredLeavesCount, virtualMapConfig); - } - - /** - * If a cache copy's estimated size exceeded flush threshold, but after mutations are - * filtered the size drops below the threshold, the copy is not flushed to disk, but - * just removes all filtered mutations using this method. - * - *

This method can only be called on sealed caches. - * - * @throws MutabilityException if called on a non-sealed cache instance. - */ - public void garbageCollect() { - if (!hashesAreImmutable.get() || !leafIndexesAreImmutable.get()) { - throw new MutabilityException("Cannot run garbage collection for a non-sealed cache"); - } - final Stream> filteredHashes = dirtyHashes.stream() - .filter(mutation -> !mutation.isFiltered()) - // Leave only the latest mutation for every hash path by setting next to null - .peek(mutation -> mutation.next = null); - dirtyHashes = new ConcurrentArray<>(filteredHashes); - filteredHashesCount.set(0); - final Stream> filteredLeafPaths = dirtyLeafPaths.stream() - .filter(mutation -> !mutation.isFiltered()) - // Leave only the latest mutation for every leaf path by setting next to null - .peek(mutation -> mutation.next = null); - dirtyLeafPaths = new ConcurrentArray<>(filteredLeafPaths); - filteredLeafPathsCount.set(0); - final Stream>> filteredLeaves = dirtyLeaves.stream() - .filter(mutation -> !mutation.isFiltered()) - // Leave only the latest mutation for every leaf by setting next to null - .peek(mutation -> mutation.next = null); - dirtyLeaves = new ConcurrentArray<>(filteredLeaves); - filteredLeavesCount.set(0); + return (dirtyHashes == null) ? 0 : dirtyHashes.size(); } /** @@ -1346,10 +1231,6 @@ private void updatePaths( }); } - private Mutation lookup(Mutation mutation) { - return lookup(mutation, fastCopyVersion.get()); - } - /** * Given a mutation list, look up the most recent mutation to this version, but no newer than this * cache's version. This method is very fast. Newer mutations are closer to the head of the mutation list, @@ -1362,7 +1243,7 @@ private Mutation lookup(Mutation mutation) { * The value type held by the mutation. It will be either a Key, leaf record, or a hash. * @return null if the mutation could be found, or the mutation. */ - private static Mutation lookup(Mutation mutation, final long upToVersion) { + private Mutation lookup(Mutation mutation) { // Walk the list of values until we find the best match for our version for (; ; ) { // If mutation is null, then there is nothing else to look for. We're done. @@ -1370,7 +1251,7 @@ private static Mutation lookup(Mutation mutation, final return null; } // We have found the best match - if (mutation.version <= upToVersion) { + if (mutation.version <= fastCopyVersion.get()) { return mutation; } // Look up the next mutation @@ -1389,7 +1270,7 @@ private static Mutation lookup(Mutation mutation, final * @return The mutation for this leaf. */ private Mutation> mutate( - final VirtualLeafRecord leaf, final boolean isNew, Mutation> mutation) { + final VirtualLeafRecord leaf, Mutation> mutation) { // We only create a new mutation if one of the following is true: // - There is no mutation in the cache (mutation == null) @@ -1401,9 +1282,6 @@ private Mutation> mutate( // Create a new mutation final Mutation> newerMutation = new Mutation<>(mutation, leaf.getKey(), leaf, fastCopyVersion.get()); - if (isNew) { - newerMutation.setNew(); - } dirtyLeaves.add(newerMutation); mutation = newerMutation; } else if (mutation.value != leaf) { @@ -1438,24 +1316,21 @@ private static void purge( final ConcurrentArray> array, final Map> index, @NonNull final VirtualMapConfig virtualMapConfig) { - array.parallelTraverse(getCleaningPool(virtualMapConfig), element -> { - if (element.isFiltered()) { - return; - } - index.compute(element.key, (key, mutation) -> { - if (mutation == null || element.equals(mutation)) { - // Already removed for a more recent mutation - return null; - } - for (Mutation m = mutation; m.next != null; m = m.next) { - if (element.equals(m.next)) { - m.next = null; - break; + array.parallelTraverse( + getCleaningPool(virtualMapConfig), + element -> index.compute(element.key, (key, mutation) -> { + if (mutation == null || element.equals(mutation)) { + // Already removed for a more recent mutation + return null; } - } - return mutation; - }); - }); + for (Mutation m = mutation; m.next != null; m = m.next) { + if (element.equals(m.next)) { + m.next = null; + break; + } + } + return mutation; + })); } /** @@ -1471,85 +1346,19 @@ private static void purge( * BE AWARE: this method is called from the other NON-static method with providing the configuration. * * @param array the list of mutations to process - * @param index the corresponding index, it's used to look up the newest mutations - * for a key - * @param newestVersion the newest version of all mutations in the array - * @param lastReleasedVersion the latest flushed version * @param * The key type used in the index * @param * The value type referenced by the mutation list */ private static void filterMutations( - final ConcurrentArray> array, - final Map> index, - final long newestVersion, - final long lastReleasedVersion, - final AtomicLong filteredCounter, - @NonNull final VirtualMapConfig virtualMapConfig) { - filteredCounter.set(0); + final ConcurrentArray> array, @NonNull final VirtualMapConfig virtualMapConfig) { final Consumer> action = mutation -> { // local variable is required because mutation.next can be changed by another thread to null // see https://github.com/hashgraph/hedera-services/issues/7046 for the context final Mutation nextMutation = mutation.next; - if (nextMutation == null) { - return; - } - mutation.next = null; - assert !nextMutation.isFiltered(); - // There may be older mutations being purged in parallel, they should not contribute - // to the "filtered" counter - if (!nextMutation.isFiltered() && (nextMutation.version > lastReleasedVersion)) { + if (nextMutation != null) { nextMutation.setFiltered(); - filteredCounter.incrementAndGet(); - } - if (!nextMutation.isNew()) { - return; - } - // nextMutation is to put a new element into a virtual map. The element doesn't - // exist in the data source. If this mutation is filtered, there must be a newer - // mutation for the same key. If that newer mutation has the "deleted" flag, the - // element should never be flushed to disk - final Mutation latestMutation = index.get(mutation.key); - // If latestMutation is null, lookup() can handle it just fine - final Mutation latestMutationUpToVersion = lookup(latestMutation, newestVersion); - if (latestMutationUpToVersion == null) { - // Mutations are processed on many threads, see array.parallelTraverse() call - // below. The key may be removed from the index or the latest mutation up to - // newestVersion may be removed in parallel on a different thread - return; - } - assert !latestMutationUpToVersion.isFiltered(); - if (latestMutationUpToVersion.isDeleted()) { - if (!latestMutationUpToVersion.isFiltered()) { - latestMutationUpToVersion.setFiltered(); - filteredCounter.incrementAndGet(); - } - // If the latest mutation up to newestVersion is "deleted", and there are no - // newer mutations, the whole entry for the key can be removed from the index. - // It's safe to do so here, as there are no references to copies older than - // newestVersion and there are no mutations in versions newer than newestVersion - index.compute(mutation.key, (k, v) -> { - assert v != null; - if (v == latestMutationUpToVersion) { - return null; - } - Mutation m = v; - while ((m != null) && (m.next != latestMutationUpToVersion)) { - m = m.next; - } - // m may be null, if latestMutationUpToVersion was removed from the list of - // mutations in a parallel thread - if (m != null) { - assert !m.isFiltered(); - assert m.version > newestVersion; - m.next = null; - } - return v; - }); - } else { - // Propagate the "new" flag to the newer mutation - latestMutationUpToVersion.setNew(); } }; try { @@ -1622,7 +1431,7 @@ private void serializePathToDirtyHashIndex( assert mutation.version <= this.fastCopyVersion.get() : "Trying to serialize pathToDirtyInternalIndex with a version ahead"; out.writeLong(mutation.version); - out.writeByte(mutation.getFlags()); + out.writeBoolean(mutation.isDeleted()); if (!mutation.isDeleted()) { out.writeSerializable(mutation.value, true); } @@ -1646,9 +1455,7 @@ private void deserializePathToDirtyHashIndex( for (int index = 0; index < sizeOfMap; index++) { final long key = in.readLong(); final long mutationVersion = in.readLong(); - final byte flags = in.readByte(); - final boolean isNew = Mutation.getFlag(flags, Mutation.FLAG_BIT_NEW); - final boolean isDeleted = Mutation.getFlag(flags, Mutation.FLAG_BIT_DELETED); + final boolean isDeleted = in.readBoolean(); Hash hash = null; if (!isDeleted) { if (version == ClassVersion.ORIGINAL) { @@ -1658,9 +1465,6 @@ private void deserializePathToDirtyHashIndex( hash = in.readSerializable(); } final Mutation mutation = new Mutation<>(null, key, hash, mutationVersion); - if (isNew) { - mutation.setNew(); - } mutation.setDeleted(isDeleted); map.put(key, mutation); dirtyHashes.add(mutation); @@ -1816,9 +1620,6 @@ private static final class Mutation { // A bit in the flags field, which indicates whether this mutation should not be included // into resulting stream of dirty hashes / leaves private static final int FLAG_BIT_FILTERED = 1; - // A bit in the flags field, which indicates whether this mutation is to insert a new - // entry to the map. It's only used for dirtyLeaves mutations - private static final int FLAG_BIT_NEW = 2; Mutation(Mutation next, K key, V value, long version) { this.next = next; @@ -1827,10 +1628,6 @@ private static final class Mutation { this.version = version; } - byte getFlags() { - return flags; - } - static boolean getFlag(final byte flags, final int bit) { return ((0xFF & flags) & (1 << bit)) != 0; } @@ -1859,14 +1656,6 @@ boolean isFiltered() { void setFiltered() { setFlag(FLAG_BIT_FILTERED, true); } - - boolean isNew() { - return getFlag(flags, FLAG_BIT_NEW); - } - - void setNew() { - setFlag(FLAG_BIT_NEW, true); - } } /** diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNode.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNode.java index 94ec0d1c38ca..91a94ecc3c47 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNode.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNode.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2021-2024 Hedera Hashgraph, LLC + * Copyright (C) 2021-2025 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1195,7 +1195,7 @@ public void waitUntilFlushed() throws InterruptedException { * {@inheritDoc} */ @Override - public boolean flush() { + public void flush() { if (!isImmutable()) { throw new IllegalStateException("mutable copies can not be flushed"); } @@ -1206,25 +1206,19 @@ public boolean flush() { throw new IllegalStateException("a merged copy can not be flushed"); } - // Prepare the cache for flush. It may affect cache's estimated size - cache.prepareForFlush(); - if (shouldBeFlushed()) { - logger.debug(VIRTUAL_MERKLE_STATS.getMarker(), "To flush {}", cache.getFastCopyVersion()); - final long start = System.currentTimeMillis(); - flush(cache, state, dataSource); - cache.release(); - final long end = System.currentTimeMillis(); - flushed.set(true); - flushLatch.countDown(); - statistics.recordFlush(end - start); - logger.debug( - VIRTUAL_MERKLE_STATS.getMarker(), "Flushed {} in {} ms", cache.getFastCopyVersion(), end - start); - return true; - } else { - logger.debug(VIRTUAL_MERKLE_STATS.getMarker(), "To GC {}", cache.getFastCopyVersion()); - cache.garbageCollect(); - return false; - } + final long start = System.currentTimeMillis(); + flush(cache, state, dataSource); + cache.release(); + final long end = System.currentTimeMillis(); + flushed.set(true); + flushLatch.countDown(); + statistics.recordFlush(end - start); + logger.debug( + VIRTUAL_MERKLE_STATS.getMarker(), + "Flushed {} v{} in {} ms", + state.getLabel(), + cache.getFastCopyVersion(), + end - start); } private void flush(VirtualNodeCache cacheToFlush, VirtualStateAccessor stateToUse, VirtualDataSource ds) { @@ -1478,7 +1472,6 @@ public void snapshot(final Path destination) throws IOException { final VirtualDataSource dataSourceCopy = dataSourceBuilder.copy(dataSource, false, true); try { final VirtualNodeCache cacheSnapshot = cache.snapshot(); - cacheSnapshot.prepareForFlush(); flush(cacheSnapshot, state, dataSourceCopy); dataSourceBuilder.snapshot(destination, dataSourceCopy); } finally { @@ -1549,7 +1542,6 @@ public void setupWithOriginalNode(final MerkleNode originalNode) { // will NEVER be updated again. assert originalMap.isHashed() : "The system should have made sure this was hashed by this point!"; final VirtualNodeCache snapshotCache = originalMap.cache.snapshot(); - snapshotCache.prepareForFlush(); flush(snapshotCache, originalMap.state, this.dataSource); return new RecordAccessorImpl<>(reconnectState, snapshotCache, keySerializer, valueSerializer, dataSource); @@ -1815,7 +1807,7 @@ private void add(final K key, final V value) { statistics.setSize(state.size()); final VirtualLeafRecord newLeaf = new VirtualLeafRecord<>(leafPath, key, value); - cache.putLeaf(newLeaf, true); + cache.putLeaf(newLeaf); } /** diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/pipeline/VirtualPipeline.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/pipeline/VirtualPipeline.java index 70953efda2d0..29d988c9906c 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/pipeline/VirtualPipeline.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/pipeline/VirtualPipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2021-2024 Hedera Hashgraph, LLC + * Copyright (C) 2021-2025 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -477,20 +477,18 @@ private long currentTotalSize() { } /** - * Try to flush a copy. Hash it if necessary. If the copy is not flushed, it - * will be eligible to merge. + * Flush a copy. Hash it if necessary. * * @param copy the copy to flush - * @return if the copy was flushed */ - private boolean tryFlush(final VirtualRoot copy) { + private void flush(final VirtualRoot copy) { if (copy.isFlushed()) { throw new IllegalStateException("copy is already flushed"); } if (!copy.isHashed()) { hashCopy(copy); } - return copy.flush(); + copy.flush(); } /** @@ -544,20 +542,14 @@ private void hashFlushMerge() { if (!copy.isImmutable()) { break; } - boolean flushed = false; if ((next == copies.getFirst()) && shouldBeFlushed(copy)) { - logger.debug(VIRTUAL_MERKLE_STATS.getMarker(), "Try to flush {}", copy.getFastCopyVersion()); - flushed = tryFlush(copy); - if (flushed) { - logger.debug(VIRTUAL_MERKLE_STATS.getMarker(), "Flushed {}", copy.getFastCopyVersion()); - copies.remove(next); - } - } - if (!flushed && (canBeMerged(next))) { + logger.debug(VIRTUAL_MERKLE_STATS.getMarker(), "Flush {}", copy.getFastCopyVersion()); + flush(copy); + copies.remove(next); + } else if (canBeMerged(next)) { assert !copy.isMerged(); logger.debug(VIRTUAL_MERKLE_STATS.getMarker(), "Merge {}", copy.getFastCopyVersion()); merge(next); - logger.debug(VIRTUAL_MERKLE_STATS.getMarker(), "Merged {}", copy.getFastCopyVersion()); copies.remove(next); } statistics.setPipelineSize(copies.getSize()); diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/pipeline/VirtualRoot.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/pipeline/VirtualRoot.java index f12fa32c1e5c..f454907272b0 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/pipeline/VirtualRoot.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/pipeline/VirtualRoot.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2021-2024 Hedera Hashgraph, LLC + * Copyright (C) 2021-2025 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,10 +48,8 @@ public interface VirtualRoot exten * returning true are guaranteed to be flushed, but other copies may be flushed, too. * *

This method can be expensive and may block for a long time before returning. - * - * @return if the copy has been flushed */ - boolean flush(); + void flush(); /** * Check if this copy has already been flushed. diff --git a/platform-sdk/swirlds-virtualmap/src/test/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCacheTest.java b/platform-sdk/swirlds-virtualmap/src/test/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCacheTest.java index 1bb6d200f53b..476020123103 100644 --- a/platform-sdk/swirlds-virtualmap/src/test/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCacheTest.java +++ b/platform-sdk/swirlds-virtualmap/src/test/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCacheTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2021-2024 Hedera Hashgraph, LLC + * Copyright (C) 2021-2025 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,6 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -139,7 +138,6 @@ void buildATree() { // hashes are calculated and put to the cache. Here the cache doesn't contain hashes for dirty leaves // (bananaLeaf0, appleLeaf0, cherryLeaf0). Should dirtyHashes() include these leaf nodes? Currently // it doesn't - cache0.prepareForFlush(); validateDirtyInternals(Set.of(rootInternal0, leftInternal0), cache0.dirtyHashesForFlush(4)); // ROUND 1: Add D and E. @@ -201,8 +199,6 @@ void buildATree() { dateLeaf1, appleLeaf1, eggplantLeaf1)); - // prepareForFlush() removes version 0 mutations for paths 2 and 3 - cache1.prepareForFlush(); validateDirtyInternals( Set.of(rootInternal1, leftInternal1, rightInternal1, leftLeftInternal1), cache1.dirtyHashesForFlush(8)); @@ -274,8 +270,6 @@ void buildATree() { figLeaf2, bananaLeaf2, grapeLeaf2)); - // prepareForFlush() removes version 1 mutations for paths 4 and 5 - cache2.prepareForFlush(); validateDirtyInternals( Set.of(rootInternal2, leftInternal2, rightInternal2, leftRightInternal2, rightLeftInternal2), cache2.dirtyHashesForFlush(12)); @@ -361,8 +355,6 @@ void buildATree() { cache3.putHash(leftInternal3); cache3.putHash(rootInternal3); cache3.seal(); - // prepareForFlush() removes version 1 mutations for paths 6 and 7 - cache3.prepareForFlush(); validateDirtyInternals( Set.of( rootInternal3, @@ -376,14 +368,7 @@ void buildATree() { // At this point, we have built the tree successfully. Verify one more time that each version of // the cache still sees things the same way it did at the time the copy was made. final VirtualNodeCache cache4 = cache; - validateTree( - cache0, - asList( - rootInternal0, - leftInternal0, - null, // became internal in version 1 - null, // became internal in version 1 - null)); // became internal in version 2 + validateTree(cache0, asList(rootInternal0, leftInternal0, bananaLeaf0, appleLeaf0, cherryLeaf0)); validateTree( cache1, asList( @@ -391,10 +376,10 @@ void buildATree() { leftInternal1, rightInternal1, leftLeftInternal1, - null, // became internal in version 1 - null, // became internal in version 1 - null, // became internal in version 2 - null, // became internal in version 2, then updated in version 3 + cherryLeaf0, + bananaLeaf1, + dateLeaf1, + appleLeaf1, eggplantLeaf1)); validateTree( cache2, @@ -405,13 +390,13 @@ void buildATree() { leftLeftInternal1, leftRightInternal2, rightLeftInternal2, - null, // updated in version 3 - null, // updated in version 3 + dateLeaf1, + appleLeaf1, eggplantLeaf1, cherryLeaf2, - null, // updated in version 3 - null, // updated in version 3 - null)); // updated in version 3 + figLeaf2, + bananaLeaf2, + grapeLeaf2)); validateTree( cache3, asList( @@ -457,9 +442,9 @@ void buildATree() { rightInternal1, leftLeftInternal1, null, - null, - null, - null, + bananaLeaf1, + dateLeaf1, + appleLeaf1, eggplantLeaf1)); }, Duration.ofSeconds(1), @@ -475,13 +460,13 @@ void buildATree() { leftLeftInternal1, leftRightInternal2, rightLeftInternal2, - null, - null, + dateLeaf1, + appleLeaf1, eggplantLeaf1, cherryLeaf2, - null, - null, - null)); + figLeaf2, + bananaLeaf2, + grapeLeaf2)); }, Duration.ofSeconds(1), "expected cache2 to eventually become clean"); @@ -545,9 +530,9 @@ void buildATree() { null, null, cherryLeaf2, - null, - null, - null)); + figLeaf2, + bananaLeaf2, + grapeLeaf2)); }, Duration.ofSeconds(1), "expected cache2 to eventually become clean"); @@ -564,7 +549,7 @@ void buildATree() { rightLeftInternal3, dogLeaf3, grapeLeaf3, - null, // E hasn't changed since version 1 + null, cherryLeaf2, foxLeaf3, bananaLeaf3, @@ -1107,7 +1092,6 @@ void mergingTheSameKeyRetainsTheMostRecentMutation() { cache1.lookupLeafByKey(B_KEY, false), "value that was looked up should match original value"); - cache1.prepareForFlush(); final List> dirtyLeaves = cache1.dirtyLeavesForFlush(1, 1).toList(); assertEquals(1, dirtyLeaves.size(), "incorrect number of dirty leaves"); @@ -1304,7 +1288,6 @@ void mergeStressTest() { // Verify everything final AtomicInteger index = new AtomicInteger(0); - cache1.prepareForFlush(); cache1.dirtyLeavesForFlush(totalMutationCount, totalMutationCount * 2) .sorted(Comparator.comparingLong(VirtualLeafRecord::getPath)) .forEach(rec -> { @@ -2584,7 +2567,6 @@ void dirtyLeaves_allInSameVersionAllDeleted() { cache.deleteLeaf(cherryLeaf(1)); cache.seal(); - cache.prepareForFlush(); final List> leaves = cache.dirtyLeavesForFlush(-1, -1).toList(); assertEquals(0, leaves.size(), "All leaves should be missing"); @@ -2649,7 +2631,6 @@ void dirtyLeaves_differentVersionsNoneDeleted() { cache0.merge(); cache1.merge(); - cache2.prepareForFlush(); final Set> leaves = cache2.dirtyLeavesForFlush(4, 8).collect(Collectors.toSet()); assertEquals(5, leaves.size(), "All leaves should be dirty"); @@ -2689,7 +2670,6 @@ void dirtyLeaves_differentVersionsSomeDeleted() { cache0.merge(); cache1.merge(); - cache2.prepareForFlush(); final Set> leaves = cache2.dirtyLeavesForFlush(3, 6).collect(Collectors.toSet()); assertEquals(4, leaves.size(), "Some leaves should be dirty"); @@ -2730,7 +2710,6 @@ void dirtyLeaves_differentVersionsAllDeleted() { cache0.merge(); cache1.merge(); - cache2.prepareForFlush(); final List> leaves = cache2.dirtyLeavesForFlush(-1, -1).toList(); assertEquals(0, leaves.size(), "All leaves should be deleted"); @@ -2751,7 +2730,6 @@ void dirtyInternals_allInSameVersionNoneDeleted() { cache0.putHash(rightLeftInternal()); cache0.seal(); - cache0.prepareForFlush(); final List internals = cache0.dirtyHashesForFlush(12).toList(); assertEquals(6, internals.size(), "All internals should be dirty"); assertEquals(rootInternal(), internals.get(0), "Unexpected internal"); @@ -2779,7 +2757,6 @@ void dirtyInternals_differentVersionsNoneDeleted() { cache1.seal(); cache0.merge(); - cache1.prepareForFlush(); final List internals = cache1.dirtyHashesForFlush(12).toList(); assertEquals(6, internals.size(), "All internals should be dirty"); assertEquals( @@ -2823,7 +2800,6 @@ void dirtyInternals_differentVersionsSomeDeleted() { cache0.merge(); cache1.merge(); - cache2.prepareForFlush(); final List internals = cache2.dirtyHashesForFlush(12).toList(); assertEquals(6, internals.size(), "All internals should be dirty"); assertEquals( @@ -2869,7 +2845,6 @@ void dirtyInternals_differentVersionsAllDeleted() { cache0.merge(); cache1.merge(); - cache2.prepareForFlush(); final List internals = cache2.dirtyHashesForFlush(-1).toList(); assertEquals(0, internals.size(), "No internals should be dirty"); } @@ -2901,85 +2876,12 @@ void dirtyLeaves_flushesAndHashing() { cache1.dirtyLeavesForHash(2, 4).toList(); assertEquals(List.of(appleLeaf(3), cherryLeaf(4)), dirtyLeaves1); - cache0.prepareForFlush(); // Flush version 0 final Set> dirtyLeaves0F = cache0.dirtyLeavesForFlush(1, 2).collect(Collectors.toSet()); assertEquals(Set.of(appleLeaf(1), bananaLeaf(2)), dirtyLeaves0F); } - @Test - @Tags({@Tag("VirtualMerkle"), @Tag("VirtualNodeCache")}) - void addedThenDeletedLeaves() { - final VirtualNodeCache cache0 = cache; - // add A - cache0.putLeaf(appleLeaf(1)); - // add B - cache0.putLeaf(bananaLeaf(2)); - - nextRound(); - System.err.println( - "H0: " + Arrays.toString(cache0.dirtyLeavesForHash(1, 2).toArray())); - cache0.seal(); - - final VirtualNodeCache cache1 = cache; - // add C - cache1.clearLeafPath(1); - cache1.putLeaf(appleLeaf(3)); - cache1.putLeaf(cherryLeaf(4)); - // add D - cache1.clearLeafPath(2); - cache1.putLeaf(bananaLeaf(5)); - cache1.putLeaf(dateLeaf(6)); - - nextRound(); - System.err.println( - "H1: " + Arrays.toString(cache1.dirtyLeavesForHash(3, 6).toArray())); - cache1.seal(); - - final VirtualNodeCache cache2 = cache; - // delete A - cache2.deleteLeaf(appleLeaf(3)); - cache2.clearLeafPath(6); - cache2.putLeaf(dateLeaf(3)); - cache2.clearLeafPath(5); - cache2.putLeaf(bananaLeaf(2)); - - nextRound(); - System.err.println( - "H2: " + Arrays.toString(cache2.dirtyLeavesForHash(2, 4).toArray())); - cache2.seal(); - - final VirtualNodeCache cache3 = cache; - // add E - cache3.clearLeafPath(2); - cache3.putLeaf(dateLeaf(5)); - cache3.putLeaf(eggplantLeaf(6)); - // add G - cache3.clearLeafPath(3); - cache3.putLeaf(dateLeaf(7)); - cache3.putLeaf(grapeLeaf(8)); - - nextRound(); - System.err.println( - "H3: " + Arrays.toString(cache3.dirtyLeavesForHash(4, 8).toArray())); - cache3.seal(); - - cache0.merge(); - cache1.merge(); - cache2.merge(); - - cache3.prepareForFlush(); - final List> dirtyLeaves = - cache3.dirtyLeavesForFlush(4, 8).toList(); - System.err.println(dirtyLeaves); - final List> deletedLeaves = - cache3.deletedLeaves().toList(); - System.err.println(deletedLeaves); - - assertEquals(1, deletedLeaves.size()); - } - // ---------------------------------------------------------------------- // Test Utility methods // ---------------------------------------------------------------------- diff --git a/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNodeTest.java b/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNodeTest.java index 3ece70c4ba7a..06112c8ef10b 100644 --- a/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNodeTest.java +++ b/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNodeTest.java @@ -16,7 +16,6 @@ package com.swirlds.virtualmap.internal.merkle; -import static com.swirlds.common.test.fixtures.AssertionUtils.assertEventuallyTrue; import static com.swirlds.common.test.fixtures.RandomUtils.nextInt; import static com.swirlds.virtualmap.test.fixtures.VirtualMapTestUtils.CONFIGURATION; import static com.swirlds.virtualmap.test.fixtures.VirtualMapTestUtils.createRoot; @@ -59,9 +58,6 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.file.Path; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -514,300 +510,6 @@ void defaultZeroFlushThresholdTest() { root.release(); } - @Test - void inMemoryAddRemoveNoFlushTest() throws InterruptedException { - final Configuration configuration = new TestConfigBuilder() - .withValue(VirtualMapConfig_.COPY_FLUSH_THRESHOLD, 1_000_000) - .getOrCreateConfig(); - - VirtualRootNode root = new VirtualRootNode<>( - TestKeySerializer.INSTANCE, - TestValueSerializer.INSTANCE, - new InMemoryBuilder(), - configuration.getConfigData(VirtualMapConfig.class)); - - final VirtualRootNode copy0 = root; - VirtualMapState state = new VirtualMapState("label"); - copy0.postInit(new VirtualStateAccessorImpl(state)); - for (int i = 0; i < 100; i++) { - final TestKey key = new TestKey(i); - final TestValue value = new TestValue(1000000 + i); - root.put(key, value); - } - - // Here is the test: in every copy, add 100 elements. In the same copy, delete all elements - // added in the previous copy. Every copy will contain no more than 200 elements, therefore - // its effective size will be small, so none of the copies should be flushed to disk - final int nCopies = 10000; - final VirtualRootNode[] copies = new VirtualRootNode[nCopies]; - copies[0] = root; - for (int copyNo = 1; copyNo < nCopies; copyNo++) { - final VirtualRootNode copy = root.copy(); - copies[copyNo] = copy; - state = state.copy(); - copy.postInit(new VirtualStateAccessorImpl(state)); - root.release(); - root = copy; - for (int i = 0; i < 100; i++) { - final int toAdd = copyNo * 100 + i; - final TestKey keyToAdd = new TestKey(toAdd); - final TestValue value = new TestValue(1000000 + toAdd); - root.put(keyToAdd, value); - final int toRemove = (copyNo - 1) * 100 + i; - final TestKey keytoRemove = new TestKey(toRemove); - root.remove(keytoRemove); - } - } - - // The last two copies should not be checked: the last one is mutable, the one before is not - // mergeable until its next copy is immutable - for (int i = 0; i < nCopies - 2; i++) { - // Copies must be merged, not flushed - assertEventuallyTrue(copies[i]::isMerged, Duration.ofSeconds(16), "copy " + i + " should be merged"); - } - - final var lcopy = root.copy(); - lcopy.postInit(new VirtualStateAccessorImpl(state)); - root.enableFlush(); - root.release(); - root.waitUntilFlushed(); - root = lcopy; - - // Values from copies 0 to nCopies - 2 should not be there (removed) - for (int copyNo = 0; copyNo < nCopies - 2; copyNo++) { - for (int i = 0; i < 100; i++) { - final int toCheck = copyNo * 100 + i; - final TestKey keyToCheck = new TestKey(toCheck); - final TestValue value = root.get(keyToCheck); - assertNull(value); - final VirtualLeafRecord leafRec = - root.getCache().lookupLeafByKey(keyToCheck, false); - assertNull(leafRec); - } - } - } - - @Test - void inMemoryManyAddManyRemoveNoFlushTest() throws InterruptedException { - final Configuration configuration = new TestConfigBuilder() - .withValue(VirtualMapConfig_.COPY_FLUSH_THRESHOLD, 1_000_000) - .getOrCreateConfig(); - - VirtualRootNode root = new VirtualRootNode<>( - TestKeySerializer.INSTANCE, - TestValueSerializer.INSTANCE, - new InMemoryBuilder(), - configuration.getConfigData(VirtualMapConfig.class)); - - final VirtualRootNode copy0 = root; - VirtualMapState state = new VirtualMapState("label"); - copy0.postInit(new VirtualStateAccessorImpl(state)); - - final int nCopies = 100; - final VirtualRootNode[] copies = new VirtualRootNode[nCopies]; - copies[0] = root; - - // Here is the test: every elemement is added in one copy and then removed in the - // next copy. Every copy will contain no more than 500 elements, therefore its - // effective size will be small, so none of the copies should be flushed to disk - for (int copyNo = 1; copyNo < nCopies; copyNo++) { - final VirtualRootNode copy = root.copy(); - copies[copyNo] = copy; - state = state.copy(); - copy.postInit(new VirtualStateAccessorImpl(state)); - root.release(); - root = copy; - final int N = 1000; - final List l = new ArrayList<>(N); - for (int i = 0; i < N; i++) { - l.add(i); - } - Collections.shuffle(l); - for (int i = 0; i < N; i++) { - final int keyIndex = l.get(i); - final TestKey key = new TestKey(keyIndex); - if (i % 2 == copyNo % 2) { // add - final TestValue value = new TestValue(1000000 + keyIndex); - root.put(key, value); - } else { // remove - root.remove(key); - } - } - } - - // The last two copies should not be checked: the last one is mutable, the one before is not - // mergeable until its next copy is immutable - for (int i = 0; i < nCopies - 2; i++) { - final VirtualRootNode copy = copies[i]; - // Copies must be merged, not flushed - assertEventuallyTrue(() -> copy.isMerged(), Duration.ofSeconds(16), "copy " + i + " should be merged"); - } - - root.release(); - } - - @Test - void inMemoryAddRemoveSomeFlushesTest() { - final Configuration configuration = new TestConfigBuilder() - .withValue(VirtualMapConfig_.COPY_FLUSH_THRESHOLD, 1_000_000) - .getOrCreateConfig(); - - VirtualRootNode root = new VirtualRootNode<>( - TestKeySerializer.INSTANCE, - TestValueSerializer.INSTANCE, - new InMemoryBuilder(), - configuration.getConfigData(VirtualMapConfig.class)); - - final int nCopies = 1000; - final VirtualRootNode[] copies = new VirtualRootNode[nCopies]; - - final VirtualRootNode copy0 = root; - copies[0] = copy0; - VirtualMapState state = new VirtualMapState("label"); - copy0.postInit(new VirtualStateAccessorImpl(state)); - for (int i = 0; i < 100; i++) { - final TestKey key = new TestKey(i); - final TestValue value = new TestValue(1000000 + i); - root.put(key, value); - } - - final VirtualRootNode copy1 = root.copy(); - copies[1] = copy1; - state = state.copy(); - copy1.postInit(new VirtualStateAccessorImpl(state)); - root.release(); - root = copy1; - for (int i = 100; i < 200; i++) { - final TestKey key = new TestKey(i); - final TestValue value = new TestValue(1000000 + i); - root.put(key, value); - } - - // Here is the test: in every copy, add 100 elements. In the same copy, delete all elements - // added in the previous copy. In the same copy, re-add the same elements that were added - // two copies ago. It will cause copies to grow in size, so eventually some copies must be - // flushed - for (int copyNo = 2; copyNo < nCopies; copyNo++) { - final VirtualRootNode copy = root.copy(); - copies[copyNo] = copy; - state = state.copy(); - copy.postInit(new VirtualStateAccessorImpl(state)); - root.release(); - root = copy; - for (int i = 0; i < 100; i++) { - // Add - final int toAdd = copyNo * 100 + i; - final TestKey keyToAdd = new TestKey(toAdd); - final TestValue value = new TestValue(1000000 + toAdd); - root.put(keyToAdd, value); - // Remove - final int toRemove = (copyNo - 1) * 100 + i; - final TestKey keytoRemove = new TestKey(toRemove); - root.remove(keytoRemove); - // Re-add - final int toReAdd = (copyNo - 2) * 100 + i; - final TestKey keytoReAdd = new TestKey(toReAdd); - final TestValue valueToReAdd = new TestValue(1000000 + toReAdd); - root.put(keytoReAdd, valueToReAdd); - } - } - - // The last two copies should not be checked: the last one is mutable, the one before is not - // mergeable until its next copy is immutable - int merged = 0; - int flushed = 0; - for (int i = 0; i < nCopies - 2; i++) { - final VirtualRootNode copy = copies[i]; - // Copies must be merged, not flushed - assertEventuallyTrue( - () -> copy.isMerged() || copy.isFlushed(), - Duration.ofSeconds(8), - "copy " + i + " should be merged or flushed"); - if (copy.isMerged()) { - merged++; - } - if (copy.isFlushed()) { - flushed++; - } - } - assertTrue(merged > 0, "At least one copy must be merged"); - assertTrue(flushed > 0, "At least one copy must be flushed"); - assertTrue(merged > flushed, "More copies must be merged than flushed"); - - // All values from copies 0 to nCopies - 2 should be available (re-added) - for (int copyNo = 0; copyNo < nCopies - 2; copyNo++) { - for (int i = 0; i < 100; i++) { - final int toCheck = copyNo * 100 + i; - final TestKey keyToCheck = new TestKey(toCheck); - final TestValue value = root.get(keyToCheck); - assertNotNull(value); - final int expected = 1000000 + toCheck; - assertEquals("Value " + expected, value.value()); - } - } - // Values from copy nCopies - 2 should not be there (removed) - for (int i = 0; i < 100; i++) { - final int toCheck = (nCopies - 2) * 100 + i; - final TestKey keyToCheck = new TestKey(toCheck); - final TestValue value = root.get(keyToCheck); - assertNull(value); - } - // Values from copy nCopies - 1 should be there (added) - for (int i = 0; i < 100; i++) { - final int toCheck = (nCopies - 1) * 100 + i; - final TestKey keyToCheck = new TestKey(toCheck); - final TestValue value = root.get(keyToCheck); - assertNotNull(value); - final int expected = 1000000 + toCheck; - assertEquals("Value " + expected, value.value()); - } - - root.release(); - } - - @Test - void inMemoryUpdateNoFlushTest() { - final Configuration configuration = new TestConfigBuilder() - .withValue(VirtualMapConfig_.COPY_FLUSH_THRESHOLD, 1_000_000) - .getOrCreateConfig(); - - VirtualRootNode root = new VirtualRootNode<>( - TestKeySerializer.INSTANCE, - TestValueSerializer.INSTANCE, - new InMemoryBuilder(), - configuration.getConfigData(VirtualMapConfig.class)); - VirtualMapState state = new VirtualMapState("label"); - root.postInit(new VirtualStateAccessorImpl(state)); - - // Here is the test: add/update 1000 elements in every copy. Number of mutations in the - // node cache will grow, but total number of entities in the map will not. Without in-memory - // maps, it would result in some flushes, and with in-memory support, all copies should be - // GC'ed and then merged - final int nCopies = 1000; - final VirtualRootNode[] copies = new VirtualRootNode[nCopies]; - copies[0] = root; - for (int copyNo = 1; copyNo < nCopies; copyNo++) { - final VirtualRootNode copy = root.copy(); - copies[copyNo] = copy; - state = state.copy(); - copy.postInit(new VirtualStateAccessorImpl(state)); - root.release(); - root = copy; - for (int i = 0; i < 1000; i++) { - final TestKey keyToAdd = new TestKey(i); - final TestValue value = new TestValue(1000000 + i); - root.put(keyToAdd, value); - } - } - - // The last two copies should not be checked: the last one is mutable, the one before is not - // mergeable until its next copy is immutable - for (int i = 0; i < nCopies - 2; i++) { - // Copies must be merged, not flushed - assertEventuallyTrue(copies[i]::isMerged, Duration.ofSeconds(16), "copy " + i + " should be merged"); - } - } - @Test @DisplayName("Copy of a root node with terminated pipeline") void copyOfRootNodeWithTerminatedPipeline() { diff --git a/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/DummyVirtualRoot.java b/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/DummyVirtualRoot.java index c478a9db87fb..bae4e392346f 100644 --- a/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/DummyVirtualRoot.java +++ b/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/DummyVirtualRoot.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2021-2024 Hedera Hashgraph, LLC + * Copyright (C) 2021-2025 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -216,7 +216,7 @@ public void overrideImmutable(final boolean immutable) { * {@inheritDoc} */ @Override - public boolean flush() { + public void flush() { if (flushed) { throw new IllegalStateException("copy is already flushed"); } @@ -265,8 +265,6 @@ public boolean flush() { flushLatch.countDown(); statistics.recordFlush(copyIndex); // Use copyIndex as flush duration - - return true; } private static boolean shouldBeFlushed(DummyVirtualRoot copy) { diff --git a/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/NoOpVirtualRoot.java b/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/NoOpVirtualRoot.java index 1ac997cff77f..f20d14190ab5 100644 --- a/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/NoOpVirtualRoot.java +++ b/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/NoOpVirtualRoot.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016-2024 Hedera Hashgraph, LLC + * Copyright (C) 2016-2025 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,9 +68,7 @@ public boolean shouldBeFlushed() { } @Override - public boolean flush() { - return true; - } + public void flush() {} @Override public boolean isFlushed() { diff --git a/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/VirtualPipelineTests.java b/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/VirtualPipelineTests.java index 531d9d9dcc00..a1a7ce0acdab 100644 --- a/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/VirtualPipelineTests.java +++ b/platform-sdk/swirlds-virtualmap/src/timingSensitive/java/com/swirlds/virtualmap/internal/pipeline/VirtualPipelineTests.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2021-2024 Hedera Hashgraph, LLC + * Copyright (C) 2021-2025 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -616,7 +616,7 @@ public SlowVirtualRoot copy() { } @Override - public boolean flush() { + public void flush() { try { if (!flushFinishedLatch.await(30, TimeUnit.SECONDS)) { throw new RuntimeException("Wait exceeded"); @@ -625,7 +625,7 @@ public boolean flush() { Thread.currentThread().interrupt(); throw new RuntimeException(ex); } - return super.flush(); + super.flush(); } @Override