From b413fbee9c0c07b4e11ebd1e83efee4c5c748c9a Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Wed, 3 Apr 2024 10:10:59 -0700 Subject: [PATCH 1/5] [Tiered Caching] Making took time policy dynamic and adding additional IT tests Signed-off-by: Sagar Upadhyaya --- .../TieredSpilloverCacheIT.java | 322 +++++++++++++++++- .../cache/common/policy/TookTimePolicy.java | 20 +- .../common/tier/TieredSpilloverCache.java | 87 ++++- .../tier/TieredSpilloverCachePlugin.java | 8 +- .../tier/TieredSpilloverCacheSettings.java | 30 +- .../common/policy/TookTimePolicyTests.java | 18 +- .../cache/common/tier/MockDiskCache.java | 53 ++- .../tier/TieredSpilloverCacheTests.java | 24 +- .../cache/store/config/CacheConfig.java | 14 + .../common/settings/ClusterSettings.java | 11 +- .../indices/IndicesRequestCache.java | 5 +- .../opensearch/indices/IndicesService.java | 2 +- .../indices/IndicesRequestCacheTests.java | 67 +++- 13 files changed, 607 insertions(+), 54 deletions(-) diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java index 568ac4d188c51..977a66c53b7e8 100644 --- a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java @@ -12,21 +12,31 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchType; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; +import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.IndicesRequestCache; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.PluginInfo; import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.hamcrest.OpenSearchAssertions; import org.junit.Assert; import java.time.ZoneId; @@ -34,15 +44,20 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; +import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING; import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.greaterThan; +@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST) public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase { @Override @@ -50,15 +65,9 @@ protected Collection> nodePlugins() { return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class); } - @Override - protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { + private Settings defaultSettings(String onHeapCacheSizeInBytesOrPecentage) { return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") .put( CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME @@ -75,10 +84,17 @@ protected Settings nodeSettings(int nodeOrdinal) { ).getKey(), MockDiskCache.MockDiskCacheFactory.NAME ) + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSizeInBytesOrPecentage + ) .build(); } public void testPluginsAreInstalled() { + internalCluster().startNode(Settings.builder().put(defaultSettings("1%")).build()); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); @@ -90,11 +106,12 @@ public void testPluginsAreInstalled() { .collect(Collectors.toList()); Assert.assertTrue( pluginInfos.stream() - .anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common" + ".tier.TieredSpilloverCachePlugin")) + .anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common.tier.TieredSpilloverCachePlugin")) ); } public void testSanityChecksWithIndicesRequestCache() throws InterruptedException { + internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%")).build()); Client client = client(); assertAcked( client.admin() @@ -118,10 +135,7 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio .setSize(0) .setSearchType(SearchType.QUERY_THEN_FETCH) .addAggregation( - dateHistogram("histo").field("f") - .timeZone(ZoneId.of("+01:00")) - .minDocCount(0) - .dateHistogramInterval(DateHistogramInterval.MONTH) + dateHistogram("histo").field("f").timeZone(ZoneId.of("+01:00")).minDocCount(0).calendarInterval(DateHistogramInterval.MONTH) ) .get(); assertSearchResponse(r1); @@ -133,6 +147,288 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio ); } + public void testWithDynamicTookTimePolicy() throws Exception { + int onHeapCacheSizeInBytes = 2000; + internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b")).build()); + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + ) + .get() + ); + // Step 1 : Set a very high value for took time policy so that no items evicted from onHeap cache are spilled + // to disk. And then hit requests so that few items are cached into cache. + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(100, TimeUnit.SECONDS) + ) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + int numberOfIndexedItems = randomIntBetween(6, 10); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + long perQuerySizeInCacheInBytes = -1; + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + if (perQuerySizeInCacheInBytes == -1) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes(); + } + assertSearchResponse(resp); + } + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + // Considering disk cache won't be used due to took time policy having a high value, we expect overall cache + // size to be less than or equal to onHeapCache size. + assertTrue(requestCacheStats.getMemorySizeInBytes() <= onHeapCacheSizeInBytes); + long entriesInCache = requestCacheStats.getMemorySizeInBytes() / perQuerySizeInCacheInBytes; + // All should be misses in the first attempt + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(numberOfIndexedItems - entriesInCache, requestCacheStats.getEvictions()); + assertEquals(0, requestCacheStats.getHitCount()); + + // Step 2: Again hit same set of queries as above, we still won't see any hits as items keeps getting evicted. + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + requestCacheStats = getRequestCacheStats(client, "index"); + // We still won't get any hits as items keep getting evicted in LRU fashion due to limited cache size. + assertTrue(requestCacheStats.getMemorySizeInBytes() <= onHeapCacheSizeInBytes); + assertEquals(numberOfIndexedItems * 2, requestCacheStats.getMissCount()); + assertEquals(numberOfIndexedItems * 2 - entriesInCache, requestCacheStats.getEvictions()); + assertEquals(0, requestCacheStats.getHitCount()); + long lastEvictionSeen = requestCacheStats.getEvictions(); + + // Step 3: Decrease took time policy to zero so that disk cache also comes into play. Now we should be able + // to cache all entries. + updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.MILLISECONDS) + ) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + requestCacheStats = getRequestCacheStats(client, "index"); + // All entries should get cached. + assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); + // No more evictions seen when compared with last step. + assertEquals(0, requestCacheStats.getEvictions() - lastEvictionSeen); + // Hit count should be equal to number of cache entries present in previous step. + assertEquals(entriesInCache, requestCacheStats.getHitCount()); + assertEquals(numberOfIndexedItems * 3 - entriesInCache, requestCacheStats.getMissCount()); + long lastHitCountSeen = requestCacheStats.getHitCount(); + long lastMissCountSeen = requestCacheStats.getMissCount(); + + // Step 4: Again hit the same requests, we should get hits for all entries. + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + requestCacheStats = getRequestCacheStats(client, "index"); + // All entries should get cached. + assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); + // No more evictions seen when compared with last step. + assertEquals(0, requestCacheStats.getEvictions() - lastEvictionSeen); + assertEquals(lastHitCountSeen + numberOfIndexedItems, requestCacheStats.getHitCount()); + assertEquals(0, lastMissCountSeen - requestCacheStats.getMissCount()); + } + + public void testInvalidationWithIndicesRequestCache() throws Exception { + int onHeapCacheSizeInBytes = 2000; + internalCluster().startNode( + Settings.builder() + .put(defaultSettings(onHeapCacheSizeInBytes + "b")) + .put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1)) + .build() + ); + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + ) + .get() + ); + // Update took time policy to zero so that all entries are eligible to be cached on disk. + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.MILLISECONDS) + ) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + int numberOfIndexedItems = randomIntBetween(5, 10); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + long perQuerySizeInCacheInBytes = -1; + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + if (perQuerySizeInCacheInBytes == -1) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes(); + } + assertSearchResponse(resp); + } + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(0, requestCacheStats.getHitCount()); + assertEquals(0, requestCacheStats.getEvictions()); + assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes()); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache(); + assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes()); + assertEquals(0, requestCacheStats.getEvictions()); + // Explicit refresh would invalidate cache entries. + refreshAndWaitForReplication(); + assertBusy(() -> { + // Explicit refresh should clear up cache entries + assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0); + }, 1, TimeUnit.SECONDS); + requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache(); + assertEquals(0, requestCacheStats.getMemorySizeInBytes()); + // Hits and misses stats shouldn't get cleared up. + assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + } + + public void testWithExplicitCacheClear() throws Exception { + int onHeapCacheSizeInBytes = 2000; + internalCluster().startNode( + Settings.builder() + .put(defaultSettings(onHeapCacheSizeInBytes + "b")) + .put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1)) + .build() + ); + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + ) + .get() + ); + // Update took time policy to zero so that all entries are eligible to be cached on disk. + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.MILLISECONDS) + ) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + int numberOfIndexedItems = randomIntBetween(5, 10); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + + long perQuerySizeInCacheInBytes = -1; + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + if (perQuerySizeInCacheInBytes == -1) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes(); + } + assertSearchResponse(resp); + } + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(0, requestCacheStats.getHitCount()); + assertEquals(0, requestCacheStats.getEvictions()); + assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes()); + + // Explicit clear the cache. + ClearIndicesCacheRequest request = new ClearIndicesCacheRequest("index"); + ClearIndicesCacheResponse response = client.admin().indices().clearCache(request).get(); + assertNoFailures(response); + + assertBusy(() -> { + // All entries should get cleared up. + assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0); + }, 1, TimeUnit.SECONDS); + } + + private RequestCacheStats getRequestCacheStats(Client client, String indexName) { + return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache(); + } + public static class MockDiskCachePlugin extends Plugin implements CachePlugin { public MockDiskCachePlugin() {} diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java index 96ef027c17187..4bc26803acf4c 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java @@ -13,12 +13,16 @@ package org.opensearch.cache.common.policy; +import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.policy.CachedQueryResult; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.unit.TimeValue; import java.util.function.Function; import java.util.function.Predicate; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; + /** * A cache tier policy which accepts queries whose took time is greater than some threshold. * The threshold should be set to approximately the time it takes to get a result from the cache tier. @@ -30,7 +34,7 @@ public class TookTimePolicy implements Predicate { /** * The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through. */ - private final TimeValue threshold; + private TimeValue threshold; /** * Function which extracts the relevant PolicyValues from a serialized CachedQueryResult @@ -41,13 +45,25 @@ public class TookTimePolicy implements Predicate { * Constructs a took time policy. * @param threshold the threshold * @param cachedResultParser the function providing policy values + * @param clusterSettings cluster settings + * @param cacheType cache type */ - public TookTimePolicy(TimeValue threshold, Function cachedResultParser) { + public TookTimePolicy( + TimeValue threshold, + Function cachedResultParser, + ClusterSettings clusterSettings, + CacheType cacheType + ) { if (threshold.compareTo(TimeValue.ZERO) < 0) { throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep()); } this.threshold = threshold; this.cachedResultParser = cachedResultParser; + clusterSettings.addSettingsUpdateConsumer(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType), this::setThreshold); + } + + private void setThreshold(TimeValue threshold) { + this.threshold = threshold; } /** diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 00a8eec93acc9..79629f7543fc7 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -15,19 +15,21 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ReleasableLock; -import org.opensearch.common.util.iterable.Iterables; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -47,6 +49,9 @@ @ExperimentalApi public class TieredSpilloverCache implements ICache { + // Used to avoid caching stale entries in lower tiers. + private static final List REMOVAL_REASONS_FOR_EVICTION = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY); + private final ICache diskCache; private final ICache onHeapCache; private final RemovalListener removalListener; @@ -69,8 +74,11 @@ public class TieredSpilloverCache implements ICache { @Override public void onRemoval(RemovalNotification notification) { try (ReleasableLock ignore = writeLock.acquire()) { - if (evaluatePolicies(notification.getValue())) { + if (REMOVAL_REASONS_FOR_EVICTION.contains(notification.getRemovalReason()) + && evaluatePolicies(notification.getValue())) { diskCache.put(notification.getKey(), notification.getValue()); + } else { + removalListener.onRemoval(notification); } } } @@ -81,6 +89,7 @@ public void onRemoval(RemovalNotification notification) { .setWeigher(builder.cacheConfig.getWeigher()) .setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes()) .setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess()) + .setClusterSettings(builder.cacheConfig.getClusterSettings()) .build(), builder.cacheType, builder.cacheFactories @@ -156,10 +165,10 @@ public void invalidateAll() { * Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache. * @return An iterable over (onHeap + disk) keys */ - @SuppressWarnings("unchecked") + @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public Iterable keys() { - return Iterables.concat(onHeapCache.keys(), diskCache.keys()); + return new ConcatenatedIterables(new Iterable[] { onHeapCache.keys(), diskCache.keys() }); } @Override @@ -213,6 +222,71 @@ boolean evaluatePolicies(V value) { return true; } + /** + * ConcatenatedIterables which combines cache iterables and supports remove() functionality as well if underlying + * iterator supports it. + * @param Type of key. + */ + class ConcatenatedIterables implements Iterable { + + final Iterable[] iterables; + + ConcatenatedIterables(Iterable[] iterables) { + this.iterables = iterables; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public Iterator iterator() { + Iterator[] iterators = (Iterator[]) new Iterator[iterables.length]; + for (int i = 0; i < iterables.length; i++) { + iterators[i] = iterables[i].iterator(); + } + return new ConcatenatedIterator<>(iterators); + } + + class ConcatenatedIterator implements Iterator { + private final Iterator[] iterators; + private int currentIteratorIndex; + private Iterator currentIterator; + + public ConcatenatedIterator(Iterator[] iterators) { + this.iterators = iterators; + this.currentIteratorIndex = 0; + this.currentIterator = iterators[currentIteratorIndex]; + } + + @Override + public boolean hasNext() { + // Check if the current iterator has next element + while (currentIterator.hasNext()) { + return true; + } + // If the current iterator is exhausted, switch to the next iterator + currentIteratorIndex++; + if (currentIteratorIndex < iterators.length) { + currentIterator = iterators[currentIteratorIndex]; + // Check if the switched iterator has next element + return hasNext(); + } + return false; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentIterator.next(); + } + + @Override + public void remove() { + currentIterator.remove(); + } + } + } + /** * Factory to create TieredSpilloverCache objects. */ @@ -253,8 +327,7 @@ public ICache create(CacheConfig config, CacheType cacheType, } ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName); - TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD - .getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType) .get(settings); Function cachedResultParser = Objects.requireNonNull( config.getCachedResultParser(), @@ -266,7 +339,7 @@ public ICache create(CacheConfig config, CacheType cacheType, .setRemovalListener(config.getRemovalListener()) .setCacheConfig(config) .setCacheType(cacheType) - .addPolicy(new TookTimePolicy(diskPolicyThreshold, cachedResultParser)) + .addPolicy(new TookTimePolicy(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType)) .build(); } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index 0cc8a711faaf5..dfd40199d859e 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -18,6 +18,8 @@ import java.util.List; import java.util.Map; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; + /** * Plugin for TieredSpilloverCache. */ @@ -51,11 +53,7 @@ public List> getSettings() { settingList.add( TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); - settingList.add( - TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace( - cacheType.getSettingPrefix() - ) - ); + settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); } return settingList; } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index 684307960b8a5..3df6c4c6a0771 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -8,9 +8,12 @@ package org.opensearch.cache.common.tier; +import org.opensearch.common.cache.CacheType; import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import static org.opensearch.common.settings.Setting.Property.NodeScope; @@ -42,17 +45,36 @@ public class TieredSpilloverCacheSettings { /** * Setting defining the minimum took time for a query to be allowed into the disk cache. */ - public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting( + private static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting( TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold", (key) -> Setting.timeSetting( key, new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting TimeValue.ZERO, // Minimum value for this setting - NodeScope + NodeScope, + Setting.Property.Dynamic ) ); - // 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range. - // Will be tuned further with future benchmarks. + + /** + * Stores took time policy settings for various cache types as these are dynamic so that can be registered and + * retrieved accordingly. + */ + public static final Map> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = getConcreteTookTimePolicySettings(); + + /** + * Fetches concrete took time policy settings. + */ + private static Map> getConcreteTookTimePolicySettings() { + Map> concreteTookTimePolicySettingMap = new HashMap<>(); + for (CacheType cacheType : CacheType.values()) { + concreteTookTimePolicySettingMap.put( + cacheType, + TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + ); + } + return concreteTookTimePolicySettingMap; + } /** * Default constructor diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java index 237c9c7b79db4..000067280e50d 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java @@ -12,20 +12,27 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; import org.opensearch.common.Randomness; +import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; import java.io.IOException; +import java.util.HashSet; import java.util.Random; import java.util.function.Function; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; + public class TookTimePolicyTests extends OpenSearchTestCase { private final Function transformationFunction = (data) -> { try { @@ -35,8 +42,17 @@ public class TookTimePolicyTests extends OpenSearchTestCase { } }; + private ClusterSettings clusterSettings; + + @Before + public void setup() { + Settings settings = Settings.EMPTY; + clusterSettings = new ClusterSettings(settings, new HashSet<>()); + clusterSettings.registerSetting(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)); + } + private TookTimePolicy getTookTimePolicy(TimeValue threshold) { - return new TookTimePolicy<>(threshold, transformationFunction); + return new TookTimePolicy<>(threshold, transformationFunction, clusterSettings, CacheType.INDICES_REQUEST_CACHE); } public void testTookTimePolicy() throws Exception { diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java index d8a6eb480a5a5..2a3b1caf73732 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java @@ -18,7 +18,9 @@ import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; +import java.util.Iterator; import java.util.Map; +import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; public class MockDiskCache implements ICache { @@ -79,7 +81,12 @@ public void invalidateAll() { @Override public Iterable keys() { - return this.cache.keySet(); + return new Iterable() { + @Override + public Iterator iterator() { + return new CacheKeyIterator<>(cache, removalListener); + } + }; } @Override @@ -156,4 +163,48 @@ public Builder setValueSerializer(Serializer valueSerializer) { } } + + /** + * Provides a iterator over keys. + * @param Type of key + * @param Type of value + */ + class CacheKeyIterator implements Iterator { + private final Iterator> entryIterator; + private final Map cache; + private final RemovalListener removalListener; + private K lastKey; + + public CacheKeyIterator(Map cache, RemovalListener removalListener) { + this.entryIterator = cache.entrySet().iterator(); + this.removalListener = removalListener; + this.cache = cache; + } + + @Override + public boolean hasNext() { + return entryIterator.hasNext(); + } + + @Override + public K next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Map.Entry entry = entryIterator.next(); + lastKey = entry.getKey(); + return lastKey; + } + + @Override + public void remove() { + if (lastKey == null) { + throw new IllegalStateException("No element to remove"); + } + V value = cache.get(lastKey); + cache.remove(lastKey); + this.removalListener.onRemoval(new RemovalNotification<>(lastKey, value, RemovalReason.INVALIDATED)); + lastKey = null; + } + } } diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index b132952834f06..431aca51099a6 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -19,14 +19,17 @@ import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.UUID; @@ -38,10 +41,20 @@ import java.util.function.Function; import java.util.function.Predicate; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; public class TieredSpilloverCacheTests extends OpenSearchTestCase { + private ClusterSettings clusterSettings; + + @Before + public void setup() { + Settings settings = Settings.EMPTY; + clusterSettings = new ClusterSettings(settings, new HashSet<>()); + clusterSettings.registerSetting(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)); + } + public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception { int onHeapCacheSize = randomIntBetween(10, 30); int keyValueSize = 50; @@ -134,6 +147,7 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception .setSettings(settings) .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken // 20_000_000 ns = 20 ms to compute + .setClusterSettings(clusterSettings) .build(), CacheType.INDICES_REQUEST_CACHE, Map.of( @@ -959,9 +973,7 @@ public void testTookTimePolicyFromFactory() throws Exception { onHeapCacheSize * keyValueSize + "b" ) .put( - TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace( - CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() - ).getKey(), + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), new TimeValue(timeValueThresholdNanos / 1_000_000) ) .build(); @@ -979,6 +991,7 @@ public CachedQueryResult.PolicyValues apply(String s) { return new CachedQueryResult.PolicyValues(tookTimeMap.get(s)); } }) + .setClusterSettings(clusterSettings) .build(), CacheType.INDICES_REQUEST_CACHE, Map.of( @@ -1024,8 +1037,9 @@ public CachedQueryResult.PolicyValues apply(String s) { public void testMinimumThresholdSettingValue() throws Exception { // Confirm we can't set TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD to below // TimeValue.ZERO (for example, MINUS_ONE) - Setting concreteSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD - .getConcreteSettingForNamespace(CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()); + Setting concreteSetting = TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get( + CacheType.INDICES_REQUEST_CACHE + ); TimeValue validDuration = new TimeValue(0, TimeUnit.MILLISECONDS); Settings validSettings = Settings.builder().put(concreteSetting.getKey(), validDuration).build(); diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index 4c9881e845d42..e537ece759e65 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -12,6 +12,7 @@ import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.serializer.Serializer; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -61,6 +62,8 @@ public class CacheConfig { */ private final TimeValue expireAfterAccess; + private final ClusterSettings clusterSettings; + private CacheConfig(Builder builder) { this.keyType = builder.keyType; this.valueType = builder.valueType; @@ -72,6 +75,7 @@ private CacheConfig(Builder builder) { this.cachedResultParser = builder.cachedResultParser; this.maxSizeInBytes = builder.maxSizeInBytes; this.expireAfterAccess = builder.expireAfterAccess; + this.clusterSettings = builder.clusterSettings; } public Class getKeyType() { @@ -114,6 +118,10 @@ public TimeValue getExpireAfterAccess() { return expireAfterAccess; } + public ClusterSettings getClusterSettings() { + return clusterSettings; + } + /** * Builder class to build Cache config related parameters. * @param Type of key. @@ -138,6 +146,7 @@ public static class Builder { private long maxSizeInBytes; private TimeValue expireAfterAccess; + private ClusterSettings clusterSettings; public Builder() {} @@ -191,6 +200,11 @@ public Builder setExpireAfterAccess(TimeValue expireAfterAccess) { return this; } + public Builder setClusterSettings(ClusterSettings clusterSettings) { + this.clusterSettings = clusterSettings; + return this; + } + public CacheConfig build() { return new CacheConfig<>(this); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8760ee3c94309..e5277d747f795 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -83,6 +83,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.settings.CacheSettings; +import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.network.NetworkService; @@ -747,6 +748,14 @@ public void apply(Settings value, Settings current, Settings previous) { TelemetrySettings.METRICS_FEATURE_ENABLED_SETTING ), List.of(FeatureFlags.PLUGGABLE_CACHE), - List.of(CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE)) + List.of( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE), + OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ), + OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ) + ) ); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 34c8d6cf5e840..607ff721bd357 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -39,6 +39,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.OpenSearchParseException; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; @@ -149,7 +150,8 @@ public final class IndicesRequestCache implements RemovalListener> cacheEntityFunction, CacheService cacheService, - ThreadPool threadPool + ThreadPool threadPool, + ClusterService clusterService ) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; @@ -179,6 +181,7 @@ public final class IndicesRequestCache implements RemovalListener Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool + threadPool, + ClusterServiceUtils.createClusterService(threadPool) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -160,7 +162,8 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), cacheService, - threadPool + threadPool, + ClusterServiceUtils.createClusterService(threadPool) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -228,7 +231,11 @@ public void testCacheDifferentReaders() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); + }), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -360,7 +367,11 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + }), + new CacheModule(new ArrayList<>(), settings).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -421,7 +432,11 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() th return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + }), + new CacheModule(new ArrayList<>(), settings).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -483,7 +498,11 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + }), + new CacheModule(new ArrayList<>(), settings).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -556,7 +575,11 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStal return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + }), + new CacheModule(new ArrayList<>(), settings).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -629,7 +652,11 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold( return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + }), + new CacheModule(new ArrayList<>(), settings).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -691,7 +718,11 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); + }), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -749,7 +780,8 @@ public void testEviction() throws Exception { Settings.EMPTY, (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool + threadPool, + ClusterServiceUtils.createClusterService(threadPool) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -780,7 +812,8 @@ public void testEviction() throws Exception { Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool + threadPool, + ClusterServiceUtils.createClusterService(threadPool) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -827,7 +860,11 @@ public void testClearAllEntityIdentity() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); + }), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -923,7 +960,11 @@ public void testInvalidate() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); + }), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); From eee2ca7a359b55df892d2e6e325bf435c9bdf842 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Wed, 3 Apr 2024 10:21:53 -0700 Subject: [PATCH 2/5] Add changelog Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 38246f566cf6e..fd6b2dc8c631d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865)) - [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697)) - [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954)) +- [Tiered Caching] Make took time caching policy setting dynamic ([#13063](https://github.com/opensearch-project/OpenSearch/pull/13063)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) From 77720bd2e64c5dcf643f95b57b2391cc1a7410a6 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 4 Apr 2024 09:49:45 -0700 Subject: [PATCH 3/5] Addressing first set of comments Signed-off-by: Sagar Upadhyaya --- .../common/tier/TieredSpilloverCache.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 79629f7543fc7..0442a7d35ebf6 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -50,7 +50,7 @@ public class TieredSpilloverCache implements ICache { // Used to avoid caching stale entries in lower tiers. - private static final List REMOVAL_REASONS_FOR_EVICTION = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY); + private static final List SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY); private final ICache diskCache; private final ICache onHeapCache; @@ -74,7 +74,7 @@ public class TieredSpilloverCache implements ICache { @Override public void onRemoval(RemovalNotification notification) { try (ReleasableLock ignore = writeLock.acquire()) { - if (REMOVAL_REASONS_FOR_EVICTION.contains(notification.getRemovalReason()) + if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()) && evaluatePolicies(notification.getValue())) { diskCache.put(notification.getKey(), notification.getValue()); } else { @@ -165,10 +165,11 @@ public void invalidateAll() { * Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache. * @return An iterable over (onHeap + disk) keys */ - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "unchecked" }) @Override public Iterable keys() { - return new ConcatenatedIterables(new Iterable[] { onHeapCache.keys(), diskCache.keys() }); + Iterable[] iterables = (Iterable[]) new Iterable[] { onHeapCache.keys(), diskCache.keys() }; + return new ConcatenatedIterables(iterables); } @Override @@ -227,7 +228,7 @@ boolean evaluatePolicies(V value) { * iterator supports it. * @param Type of key. */ - class ConcatenatedIterables implements Iterable { + static class ConcatenatedIterables implements Iterable { final Iterable[] iterables; @@ -245,7 +246,7 @@ public Iterator iterator() { return new ConcatenatedIterator<>(iterators); } - class ConcatenatedIterator implements Iterator { + static class ConcatenatedIterator implements Iterator { private final Iterator[] iterators; private int currentIteratorIndex; private Iterator currentIterator; @@ -258,18 +259,14 @@ public ConcatenatedIterator(Iterator[] iterators) { @Override public boolean hasNext() { - // Check if the current iterator has next element - while (currentIterator.hasNext()) { - return true; - } - // If the current iterator is exhausted, switch to the next iterator - currentIteratorIndex++; - if (currentIteratorIndex < iterators.length) { + while (!currentIterator.hasNext()) { + currentIteratorIndex++; + if (currentIteratorIndex == iterators.length) { + return false; + } currentIterator = iterators[currentIteratorIndex]; - // Check if the switched iterator has next element - return hasNext(); } - return false; + return true; } @Override From 4a0a3574a55c835d452a1e3085b63c3b85372433 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 4 Apr 2024 10:11:52 -0700 Subject: [PATCH 4/5] Addressing comments Signed-off-by: Sagar Upadhyaya --- .../common/tier/TieredSpilloverCache.java | 4 +-- .../cache/common/tier/MockDiskCache.java | 25 ++++++++----------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 0442a7d35ebf6..cab05732ba1c4 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -236,10 +236,10 @@ static class ConcatenatedIterables implements Iterable { this.iterables = iterables; } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "unchecked" }) @Override public Iterator iterator() { - Iterator[] iterators = (Iterator[]) new Iterator[iterables.length]; + Iterator[] iterators = (Iterator[]) new Iterator[iterables.length]; for (int i = 0; i < iterables.length; i++) { iterators[i] = iterables[i].iterator(); } diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java index 2a3b1caf73732..548c5d846dda5 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java @@ -81,12 +81,7 @@ public void invalidateAll() { @Override public Iterable keys() { - return new Iterable() { - @Override - public Iterator iterator() { - return new CacheKeyIterator<>(cache, removalListener); - } - }; + return () -> new CacheKeyIterator<>(cache, removalListener); } @Override @@ -169,11 +164,11 @@ public Builder setValueSerializer(Serializer valueSerializer) { * @param Type of key * @param Type of value */ - class CacheKeyIterator implements Iterator { + static class CacheKeyIterator implements Iterator { private final Iterator> entryIterator; private final Map cache; private final RemovalListener removalListener; - private K lastKey; + private K currentKey; public CacheKeyIterator(Map cache, RemovalListener removalListener) { this.entryIterator = cache.entrySet().iterator(); @@ -192,19 +187,19 @@ public K next() { throw new NoSuchElementException(); } Map.Entry entry = entryIterator.next(); - lastKey = entry.getKey(); - return lastKey; + currentKey = entry.getKey(); + return currentKey; } @Override public void remove() { - if (lastKey == null) { + if (currentKey == null) { throw new IllegalStateException("No element to remove"); } - V value = cache.get(lastKey); - cache.remove(lastKey); - this.removalListener.onRemoval(new RemovalNotification<>(lastKey, value, RemovalReason.INVALIDATED)); - lastKey = null; + V value = cache.get(currentKey); + cache.remove(currentKey); + this.removalListener.onRemoval(new RemovalNotification<>(currentKey, value, RemovalReason.INVALIDATED)); + currentKey = null; } } } From e22be9aa70ee86b81a8b7dbb4415ff6f5d19f4a4 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 9 Apr 2024 09:36:39 -0700 Subject: [PATCH 5/5] Using a static block to initialize tiered caching setting Signed-off-by: Sagar Upadhyaya --- .../cache/common/tier/TieredSpilloverCacheSettings.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index 3df6c4c6a0771..b89e8c517a351 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -60,12 +60,12 @@ public class TieredSpilloverCacheSettings { * Stores took time policy settings for various cache types as these are dynamic so that can be registered and * retrieved accordingly. */ - public static final Map> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = getConcreteTookTimePolicySettings(); + public static final Map> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; /** * Fetches concrete took time policy settings. */ - private static Map> getConcreteTookTimePolicySettings() { + static { Map> concreteTookTimePolicySettingMap = new HashMap<>(); for (CacheType cacheType : CacheType.values()) { concreteTookTimePolicySettingMap.put( @@ -73,7 +73,7 @@ private static Map> getConcreteTookTimePolicySetti TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); } - return concreteTookTimePolicySettingMap; + TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap; } /**