Skip to content

Commit

Permalink
Improved virtual node cache flushes and purges
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Ananev <[email protected]>
  • Loading branch information
artemananiev committed Nov 5, 2024
1 parent 407d2be commit 59fc16a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private static class ConcurrentArraySpliterator<T> extends AbstractSpliterator<T
private int arrIndex = 0;

ConcurrentArraySpliterator(final int size, final SubArray<T> head) {
super(size, Spliterator.SIZED | Spliterator.NONNULL | Spliterator.IMMUTABLE);
super(size, Spliterator.SIZED | Spliterator.NONNULL | Spliterator.IMMUTABLE | Spliterator.CONCURRENT);
arr = head;
skipEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ private static synchronized Executor getCleaningPool() {
* <p>
* <strong>ONE PER CACHE INSTANCE</strong>.
*/
private ConcurrentArray<Mutation<K, VirtualLeafRecord<K, V>>> dirtyLeaves = new ConcurrentArray<>();
private volatile ConcurrentArray<Mutation<K, VirtualLeafRecord<K, V>>> dirtyLeaves = new ConcurrentArray<>();

/**
* When the cache is filtered using {@link #filterMutations(ConcurrentArray, Map, long, AtomicLong)},
* When the cache is filtered using {@link #filterMutations(ConcurrentArray, Map, long, long, AtomicLong)},
* this field stores the number of filtered mutations in {@link #dirtyLeaves}. This number
* affects cache's estimated size.
*/
Expand All @@ -285,10 +285,10 @@ private static synchronized Executor getCleaningPool() {
* <p>
* <strong>ONE PER CACHE INSTANCE</strong>.
*/
private ConcurrentArray<Mutation<Long, K>> dirtyLeafPaths = new ConcurrentArray<>();
private volatile ConcurrentArray<Mutation<Long, K>> dirtyLeafPaths = new ConcurrentArray<>();

/**
* When the cache is filtered using {@link #filterMutations(ConcurrentArray, Map, long, AtomicLong)},
* When the cache is filtered using {@link #filterMutations(ConcurrentArray, Map, long, long, AtomicLong)},
* this field stores the number of filtered mutations in {@link #dirtyLeafPaths}. This number
* affects cache's estimated size.
*/
Expand All @@ -302,10 +302,10 @@ private static synchronized Executor getCleaningPool() {
* <p>
* <strong>ONE PER CACHE INSTANCE</strong>.
*/
private ConcurrentArray<Mutation<Long, Hash>> dirtyHashes = new ConcurrentArray<>();
private volatile ConcurrentArray<Mutation<Long, Hash>> dirtyHashes = new ConcurrentArray<>();

/**
* When the cache is filtered using {@link #filterMutations(ConcurrentArray, Map, long, AtomicLong)},
* When the cache is filtered using {@link #filterMutations(ConcurrentArray, Map, long, long, AtomicLong)},
* this field stores the number of filtered mutations in {@link #dirtyHashes}. This number
* affects cache's estimated size.
*/
Expand Down Expand Up @@ -1066,9 +1066,10 @@ public void prepareForFlush() {
// Mark obsolete mutations to filter later and update "filtered" counters. These
// counters will affect the estimated size
final long version = getFastCopyVersion();
filterMutations(dirtyHashes, pathToDirtyHashIndex, version, filteredHashesCount);
filterMutations(dirtyLeafPaths, pathToDirtyLeafIndex, version, filteredLeafPathsCount);
filterMutations(dirtyLeaves, keyToDirtyLeafIndex, version, filteredLeavesCount);
final long lastReleasedVersion = lastReleased.get();
filterMutations(dirtyHashes, pathToDirtyHashIndex, version, lastReleasedVersion, filteredHashesCount);
filterMutations(dirtyLeafPaths, pathToDirtyLeafIndex, version, lastReleasedVersion, filteredLeafPathsCount);
filterMutations(dirtyLeaves, keyToDirtyLeafIndex, version, lastReleasedVersion, filteredLeavesCount);
}

/**
Expand All @@ -1085,24 +1086,18 @@ public void garbageCollect(final long firstLeafPath, final long lastLeafPath) {
throw new MutabilityException("Cannot run garbage collection for a non-sealed cache");

Check warning on line 1086 in platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java#L1086

Added line #L1086 was not covered by tests
}
final Stream<Mutation<Long, Hash>> filteredHashes = dirtyHashes.stream()
.filter(mutation -> mutation.key <= lastLeafPath)
.filter(mutation -> !mutation.isFiltered())
// Leave only the latest mutation for every hash path by setting next to null
.peek(mutation -> mutation.next = null);
dirtyHashes = new ConcurrentArray<>(filteredHashes);
filteredHashesCount.set(0);
final Stream<Mutation<Long, K>> filteredLeafPaths = dirtyLeafPaths.stream()
.filter(mutation -> (mutation.key >= firstLeafPath) && (mutation.key <= lastLeafPath))
.filter(mutation -> !mutation.isFiltered())
// Leave only the latest mutation for every leaf path by setting next to null
.peek(mutation -> mutation.next = null);
dirtyLeafPaths = new ConcurrentArray<>(filteredLeafPaths);
filteredLeafPathsCount.set(0);
final Stream<Mutation<K, VirtualLeafRecord<K, V>>> filteredLeaves = dirtyLeaves.stream()
.filter(mutation -> {
final long path = mutation.value.getPath();
return path >= firstLeafPath && path <= lastLeafPath;
})
.filter(mutation -> !mutation.isFiltered())
// Leave only the latest mutation for every leaf by setting next to null
.peek(mutation -> mutation.next = null);
Expand Down Expand Up @@ -1398,21 +1393,24 @@ private Mutation<K, VirtualLeafRecord<K, V>> mutate(
* The value type referenced by the mutation list
*/
private static <K, V> void purge(final ConcurrentArray<Mutation<K, V>> array, final Map<K, Mutation<K, V>> index) {
array.parallelTraverse(
getCleaningPool(),
element -> index.compute(element.key, (key, mutation) -> {
if (mutation == null || element.equals(mutation)) {
// Already removed for a more recent mutation
return null;
}
for (Mutation<K, V> m = mutation; m.next != null; m = m.next) {
if (element.equals(m.next)) {
m.next = null;
break;
}
array.parallelTraverse(getCleaningPool(), element -> {
if (element.isFiltered()) {
return;
}
index.compute(element.key, (key, mutation) -> {
if (mutation == null || element.equals(mutation)) {
// Already removed for a more recent mutation
return null;
}
for (Mutation<K, V> m = mutation; m.next != null; m = m.next) {
if (element.equals(m.next)) {
m.next = null;
break;
}
return mutation;
}));
}
return mutation;
});
});
}

/**
Expand All @@ -1429,6 +1427,7 @@ private static <K, V> void purge(final ConcurrentArray<Mutation<K, V>> array, fi
* @param index the corresponding index, it's used to look up the newest mutations
* for a key
* @param newestVersion the newest version of all mutations in the array
* @param lastReleasedVersion the latest flushed version
* @param <K>
* The key type used in the index
* @param <V>
Expand All @@ -1438,36 +1437,55 @@ private static <K, V> void filterMutations(
final ConcurrentArray<Mutation<K, V>> array,
final Map<K, Mutation<K, V>> index,
final long newestVersion,
final long lastReleasedVersion,
final AtomicLong filteredCounter) {
filteredCounter.set(0);
final Consumer<Mutation<K, V>> action = mutation -> {
// local variable is required because mutation.next can be changed by another thread to null
// see https://github.com/hashgraph/hedera-services/issues/7046 for the context
Mutation<K, V> nextMutation = mutation.next;
mutation.next = null;
if (nextMutation != null) {
assert !nextMutation.isFiltered();
nextMutation.setFiltered();
filteredCounter.incrementAndGet();
// There may be older mutations being purged in parallel, they should not contribute
// to the "filtered" counter
if (!nextMutation.isFiltered() && (nextMutation.version > lastReleasedVersion)) {
nextMutation.setFiltered();
filteredCounter.incrementAndGet();
}
if (nextMutation.isNew()) {
// nextMutation is to put a new element into a virtual map. The element doesn't
// exist in the data source. If this mutation is filtered, there must be a newer
// mutation for the same key. If that newer mutation has the "deleted" flag, the
// element should never be flushed to disk
final Mutation<K, V> latestMutation = index.get(nextMutation.key);
final Mutation<K, V> latestMutation = index.get(mutation.key);
assert latestMutation != null;
final Mutation<K, V> latestMutationUpToVersion = lookup(latestMutation, newestVersion);
assert latestMutationUpToVersion != null;
assert !latestMutationUpToVersion.isFiltered();
if (latestMutationUpToVersion.isDeleted()) {
assert !latestMutationUpToVersion.isFiltered();
if (!latestMutationUpToVersion.isFiltered()) {
latestMutationUpToVersion.setFiltered();
filteredCounter.incrementAndGet();
}
// If the latest mutation up to newestVersion is "deleted", and there are no
// newer mutations, the whole entry for the key can be removed from the index.
// It's safe to do so here, as there are no references to copies older than
// newestVersion and there are no mutations in versions newer than newestVersion
if (latestMutation == latestMutationUpToVersion) {
latestMutationUpToVersion.setFiltered();
filteredCounter.incrementAndGet();
index.remove(latestMutation.key);
}
index.compute(mutation.key, (k, v) -> {
assert v != null;
if (v == latestMutationUpToVersion) {
return null;
}
Mutation<K, V> m = v;

Check warning on line 1480 in platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java#L1480

Added line #L1480 was not covered by tests
while (m.next != latestMutationUpToVersion) {
m = m.next;

Check warning on line 1482 in platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java#L1482

Added line #L1482 was not covered by tests
}
assert !m.isFiltered();
assert m.version > newestVersion;
m.next = null;
return v;

Check warning on line 1487 in platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/cache/VirtualNodeCache.java#L1486-L1487

Added lines #L1486 - L1487 were not covered by tests
});
} else {
// Propagate the "new" flag to the newer mutation
latestMutationUpToVersion.setNew();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ void defaultZeroFlushThresholdTest() {
}

@Test
void inMemoryAddRemoveNoFlushTest() {
void inMemoryAddRemoveNoFlushTest() throws InterruptedException {
final Configuration configuration = new TestConfigBuilder()
.withValue(VirtualMapConfig_.COPY_FLUSH_THRESHOLD, 1_000_000)
.getOrCreateConfig();
Expand Down Expand Up @@ -479,6 +479,26 @@ void inMemoryAddRemoveNoFlushTest() {
// Copies must be merged, not flushed
assertEventuallyTrue(copies[i]::isMerged, Duration.ofSeconds(16), "copy " + i + " should be merged");
}

final var lcopy = root.copy();
lcopy.postInit(new VirtualStateAccessorImpl(state));
root.enableFlush();
root.release();
root.waitUntilFlushed();
root = lcopy;

// Values from copies 0 to nCopies - 2 should not be there (removed)
for (int copyNo = 0; copyNo < nCopies - 2; copyNo++) {
for (int i = 0; i < 100; i++) {
final int toCheck = copyNo * 100 + i;
final TestKey keyToCheck = new TestKey(toCheck);
final TestValue value = root.get(keyToCheck);
assertNull(value);
final VirtualLeafRecord<TestKey, TestValue> leafRec =
root.getCache().lookupLeafByKey(keyToCheck, false);
assertNull(leafRec);
}
}
}

@Test
Expand Down

0 comments on commit 59fc16a

Please sign in to comment.