Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Onheap impl startree #25

1 change: 1 addition & 0 deletions .github/workflows/gradle-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ jobs:
if: success()
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./codeCoverage.xml

- name: Create Comment Success
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `azure-identity` from 1.11.4 to 1.13.0, Bump `msal4j` from 1.14.3 to 1.15.1, Bump `msal4j-persistence-extension` from 1.2.0 to 1.3.0 ([#14506](https://github.com/opensearch-project/OpenSearch/pull/14506))

### Changed
- [Tiered Caching] Move query recomputation logic outside write lock ([#14187](https://github.com/opensearch-project/OpenSearch/pull/14187))
- unsignedLongRangeQuery now returns MatchNoDocsQuery if the lower bounds are greater than the upper bounds ([#14416](https://github.com/opensearch-project/OpenSearch/pull/14416))
- Updated the `indices.query.bool.max_clause_count` setting from being static to dynamically updateable ([#13568](https://github.com/opensearch-project/OpenSearch/pull/13568))
- Make the class CommunityIdProcessor final ([#14448](https://github.com/opensearch-project/OpenSearch/pull/14448))
Expand Down
15 changes: 14 additions & 1 deletion TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ OpenSearch uses [jUnit](https://junit.org/junit5/) for testing, it also uses ran
- [Miscellaneous](#miscellaneous)
- [Running verification tasks](#running-verification-tasks)
- [Testing the REST layer](#testing-the-rest-layer)
- [Running REST Tests Against An External Cluster](#running-rest-tests-against-an-external-cluster)
- [Debugging REST Tests](#debugging-rest-tests)
- [Testing packaging](#testing-packaging)
- [Testing packaging on Windows](#testing-packaging-on-windows)
- [Testing VMs are disposable](#testing-vms-are-disposable)
Expand Down Expand Up @@ -272,7 +274,18 @@ yamlRestTest’s and javaRestTest’s are easy to identify, since they are found

If in doubt about which command to use, simply run <gradle path>:check

Note that the REST tests, like all the integration tests, can be run against an external cluster by specifying the `tests.cluster` property, which if present needs to contain a comma separated list of nodes to connect to (e.g. localhost:9300).
## Running REST Tests Against An External Cluster

Note that the REST tests, like all the integration tests, can be run against an external cluster by specifying the following properties `tests.cluster`, `tests.rest.cluster`, `tests.clustername`. Use a comma separated list of node properties for the multi-node cluster.

For example :

./gradlew :rest-api-spec:yamlRestTest \
-Dtests.cluster=localhost:9200 -Dtests.rest.cluster=localhost:9200 -Dtests.clustername=opensearch

## Debugging REST Tests

You can launch a local OpenSearch cluster in debug mode following [Launching and debugging from an IDE](#launching-and-debugging-from-an-ide), and run your REST tests against that following [Running REST Tests Against An External Cluster](#running-rest-tests-against-an-external-cluster).

# Testing packaging

Expand Down
4 changes: 4 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,7 @@ ${path.logs}
# Gates the functionality of enabling Opensearch to use pluggable caches with respective store names via setting.
#
#opensearch.experimental.feature.pluggable.caching.enabled: false
#
# Gates the functionality of star tree index, which improves the performance of search aggregations.
#
#opensearch.experimental.feature.composite_index.star_tree.enabled: true
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cache.common.tier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
Expand Down Expand Up @@ -35,9 +37,13 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.ToLongBiFunction;
Expand All @@ -61,6 +67,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {

// Used to avoid caching stale entries in lower tiers.
private static final List<RemovalReason> SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);
private static final Logger logger = LogManager.getLogger(TieredSpilloverCache.class);

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
Expand All @@ -86,6 +93,12 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
private final Map<ICache<K, V>, TierInfo> caches;
private final List<Predicate<V>> policies;

/**
* This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value
* only once.
*/
Map<ICacheKey<K>, CompletableFuture<Tuple<ICacheKey<K>, V>>> completableFutureMap = new ConcurrentHashMap<>();

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
Expand Down Expand Up @@ -190,10 +203,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = null;
try (ReleasableLock ignore = writeLock.acquire()) {
value = onHeapCache.computeIfAbsent(key, loader);
}
V value = compute(key, loader);
// Handle stats
if (loader.isLoaded()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
Expand Down Expand Up @@ -222,6 +232,57 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
CompletableFuture<Tuple<ICacheKey<K>, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
// Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Void> handler = (pair, ex) -> {
if (pair != null) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
}
} else {
if (ex != null) {
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
completableFutureMap.remove(key); // Remove key from map as not needed anymore.
return null;
};
V value = null;
if (future == null) {
future = completableFutureMap.get(key);
future.handle(handler);
try {
value = loader.load(key);
} catch (Exception ex) {
future.completeExceptionally(ex);
throw new ExecutionException(ex);
}
if (value == null) {
NullPointerException npe = new NullPointerException("Loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Tuple<>(key, value));
}
} else {
try {
value = future.get().v2();
} catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
return value;
}

@Override
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
Expand Down Expand Up @@ -328,12 +389,22 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
ICacheKey<K> key = notification.getKey();
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason());
boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier
if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) {
boolean exceptionOccurredOnDiskCachePut = false;
boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue());
if (canCacheOnDisk) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
} catch (Exception ex) {
// TODO: Catch specific exceptions. Needs better exception handling. We are just swallowing exception
// in this case as it shouldn't cause upstream request to fail.
logger.warn("Exception occurred while putting item to disk cache", ex);
exceptionOccurredOnDiskCachePut = true;
}
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
} else {
if (!exceptionOccurredOnDiskCachePut) {
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
}
}
if (!canCacheOnDisk || exceptionOccurredOnDiskCachePut) {
// If the value is not going to the disk cache, send this notification to the TSC's removal listener
// as the value is leaving the TSC entirely
removalListener.onRemoval(notification);
Expand Down
Loading
Loading