From 60f3fc14497041d7e1b4a0f77a0e2e55a123e0f7 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Thu, 8 Feb 2024 08:49:56 +0100 Subject: [PATCH] Eliminate WATCH Command in absence of Optimistic Locking In scenarios where optimistic locking is disabled, it's unnecessary to use the WATCH command, especially when the subsequent update operation won't be conducted within a transaction (MULTI). Furthermore, in cases where the value is in the cache and optimistic locking is enabled, invoking the UNWATCH command is required to ensure the connection ceases to monitor the key. It's worth noting that within transactions executed (EXEC command) or aborted (ABORT command), Redis autonomously handles the UNWATCH operation, removing the need for explicit invocation. --- .../RedisCacheWithOptimisticLockingTest.java | 135 ++++++++++++++++++ .../cache/redis/runtime/RedisCacheImpl.java | 44 ++++-- 2 files changed, 171 insertions(+), 8 deletions(-) create mode 100644 extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/RedisCacheWithOptimisticLockingTest.java diff --git a/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/RedisCacheWithOptimisticLockingTest.java b/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/RedisCacheWithOptimisticLockingTest.java new file mode 100644 index 0000000000000..10362aa07c000 --- /dev/null +++ b/extensions/redis-cache/deployment/src/test/java/io/quarkus/cache/redis/deployment/RedisCacheWithOptimisticLockingTest.java @@ -0,0 +1,135 @@ +package io.quarkus.cache.redis.deployment; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.List; +import java.util.Optional; + +import jakarta.inject.Inject; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.arc.Arc; +import io.quarkus.cache.Cache; +import io.quarkus.cache.CacheManager; +import io.quarkus.cache.redis.runtime.RedisCache; +import io.quarkus.redis.datasource.RedisDataSource; +import io.quarkus.test.QuarkusUnitTest; + +public class RedisCacheWithOptimisticLockingTest { + + private static final String KEY_1 = "1"; + private static final String KEY_2 = "2"; + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .withApplicationRoot(jar -> jar.addClasses(SimpleCachedService.class, TestUtil.class)) + .overrideRuntimeConfigKey("quarkus.cache.redis.use-optimistic-locking", "true"); + + @Inject + SimpleCachedService simpleCachedService; + + @Test + public void testTypes() { + CacheManager cacheManager = Arc.container().select(CacheManager.class).get(); + assertNotNull(cacheManager); + + Optional cacheOpt = cacheManager.getCache(SimpleCachedService.CACHE_NAME); + assertTrue(cacheOpt.isPresent()); + + Cache cache = cacheOpt.get(); + assertTrue(cache instanceof RedisCache); + } + + @Test + public void testAllCacheAnnotations() { + RedisDataSource redisDataSource = Arc.container().select(RedisDataSource.class).get(); + List allKeysAtStart = TestUtil.allRedisKeys(redisDataSource); + + // STEP 1 + // Action: @CacheResult-annotated method call. + // Expected effect: method invoked and result cached. + // Verified by: STEP 2. + String value1 = simpleCachedService.cachedMethod(KEY_1); + List newKeys = TestUtil.allRedisKeys(redisDataSource); + assertEquals(allKeysAtStart.size() + 1, newKeys.size()); + Assertions.assertThat(newKeys).contains(expectedCacheKey(KEY_1)); + + // STEP 2 + // Action: same call as STEP 1. + // Expected effect: method not invoked and result coming from the cache. + // Verified by: same object reference between STEPS 1 and 2 results. + String value2 = simpleCachedService.cachedMethod(KEY_1); + assertEquals(value1, value2); + assertEquals(allKeysAtStart.size() + 1, + TestUtil.allRedisKeys(redisDataSource).size()); + + // STEP 3 + // Action: same call as STEP 2 with a new key. + // Expected effect: method invoked and result cached. + // Verified by: different objects references between STEPS 2 and 3 results. + String value3 = simpleCachedService.cachedMethod(KEY_2); + assertNotEquals(value2, value3); + newKeys = TestUtil.allRedisKeys(redisDataSource); + assertEquals(allKeysAtStart.size() + 2, newKeys.size()); + Assertions.assertThat(newKeys).contains(expectedCacheKey(KEY_1), expectedCacheKey(KEY_2)); + + // STEP 4 + // Action: cache entry invalidation. + // Expected effect: STEP 2 cache entry removed. + // Verified by: STEP 5. + simpleCachedService.invalidate(KEY_1); + newKeys = TestUtil.allRedisKeys(redisDataSource); + assertEquals(allKeysAtStart.size() + 1, newKeys.size()); + Assertions.assertThat(newKeys).contains(expectedCacheKey(KEY_2)).doesNotContain(expectedCacheKey(KEY_1)); + + // STEP 5 + // Action: same call as STEP 2. + // Expected effect: method invoked because of STEP 4 and result cached. + // Verified by: different objects references between STEPS 2 and 5 results. + String value5 = simpleCachedService.cachedMethod(KEY_1); + assertNotEquals(value2, value5); + newKeys = TestUtil.allRedisKeys(redisDataSource); + assertEquals(allKeysAtStart.size() + 2, newKeys.size()); + Assertions.assertThat(newKeys).contains(expectedCacheKey(KEY_1), expectedCacheKey(KEY_2)); + + // STEP 6 + // Action: same call as STEP 3. + // Expected effect: method not invoked and result coming from the cache. + // Verified by: same object reference between STEPS 3 and 6 results. + String value6 = simpleCachedService.cachedMethod(KEY_2); + assertEquals(value3, value6); + assertEquals(allKeysAtStart.size() + 2, + TestUtil.allRedisKeys(redisDataSource).size()); + + // STEP 7 + // Action: full cache invalidation. + // Expected effect: empty cache. + // Verified by: STEPS 8 and 9. + simpleCachedService.invalidateAll(); + newKeys = TestUtil.allRedisKeys(redisDataSource); + assertEquals(allKeysAtStart.size(), newKeys.size()); + Assertions.assertThat(newKeys).doesNotContain(expectedCacheKey(KEY_1), expectedCacheKey(KEY_2)); + + // STEP 8 + // Action: same call as STEP 5. + // Expected effect: method invoked because of STEP 7 and result cached. + // Verified by: different objects references between STEPS 5 and 8 results. + String value8 = simpleCachedService.cachedMethod(KEY_1); + assertNotEquals(value5, value8); + + // STEP 9 + // Action: same call as STEP 6. + // Expected effect: method invoked because of STEP 7 and result cached. + // Verified by: different objects references between STEPS 6 and 9 results. + String value9 = simpleCachedService.cachedMethod(KEY_2); + assertNotEquals(value6, value9); + } + + private static String expectedCacheKey(String key) { + return "cache:" + SimpleCachedService.CACHE_NAME + ":" + key; + } + +} diff --git a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java index 2a60256424809..e7e39822c351f 100644 --- a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java +++ b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java @@ -146,24 +146,40 @@ public Uni get(K key, Class clazz, Function valueLoader) { // With optimistic locking: // WATCH K // val = deserialize(GET K) - // If val == null - // MULTI - // SET K computation.apply(K) - // EXEC + // if val == null + // MULTI + // SET K computation.apply(K) + // EXEC + // else + // UNWATCH K + // return val // Without: // val = deserialize(GET K) // if (val == null) => SET K computation.apply(K) + // else => return val byte[] encodedKey = marshaller.encode(computeActualKey(encodeKey(key))); boolean isWorkerThread = blockingAllowedSupplier.get(); return withConnection(new Function>() { @Override public Uni apply(RedisConnection connection) { - return watch(connection, encodedKey) - .chain(new GetFromConnectionSupplier<>(connection, clazz, encodedKey, marshaller)) + Uni startingPoint; + if (cacheInfo.useOptimisticLocking) { + startingPoint = watch(connection, encodedKey) + .chain(new GetFromConnectionSupplier<>(connection, clazz, encodedKey, marshaller)); + } else { + startingPoint = new GetFromConnectionSupplier<>(connection, clazz, encodedKey, marshaller).get(); + } + + return startingPoint .chain(Unchecked.function(new UncheckedFunction<>() { @Override public Uni apply(V cached) throws Exception { if (cached != null) { + // Unwatch if optimistic locking + if (cacheInfo.useOptimisticLocking) { + return connection.send(Request.cmd(Command.UNWATCH)) + .replaceWith(cached); + } return Uni.createFrom().item(new StaticSupplier<>(cached)); } else { Uni uni = computeValue(key, valueLoader, isWorkerThread); @@ -210,10 +226,22 @@ public Uni getAsync(K key, Class clazz, Function> valueLo return withConnection(new Function>() { @Override public Uni apply(RedisConnection connection) { - return watch(connection, encodedKey) - .chain(new GetFromConnectionSupplier<>(connection, clazz, encodedKey, marshaller)) + Uni startingPoint; + if (cacheInfo.useOptimisticLocking) { + startingPoint = watch(connection, encodedKey) + .chain(new GetFromConnectionSupplier<>(connection, clazz, encodedKey, marshaller)); + } else { + startingPoint = new GetFromConnectionSupplier<>(connection, clazz, encodedKey, marshaller).get(); + } + + return startingPoint .chain(cached -> { if (cached != null) { + // Unwatch if optimistic locking + if (cacheInfo.useOptimisticLocking) { + return connection.send(Request.cmd(Command.UNWATCH)) + .replaceWith(cached); + } return Uni.createFrom().item(new StaticSupplier<>(cached)); } else { Uni getter = valueLoader.apply(key);