diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCache.java b/evcache-core/src/main/java/com/netflix/evcache/EVCache.java index 5c8915dc..a3945796 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCache.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCache.java @@ -7,8 +7,6 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import javax.annotation.Nullable; @@ -18,12 +16,10 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.netflix.archaius.api.PropertyRepository; import com.netflix.evcache.EVCacheLatch.Policy; import com.netflix.evcache.operation.EVCacheItem; import com.netflix.evcache.operation.EVCacheItemMetaData; import com.netflix.evcache.pool.EVCacheClientPoolManager; -import com.netflix.evcache.util.EVCacheConfig; import net.spy.memcached.transcoders.Transcoder; import rx.Scheduler; @@ -70,7 +66,7 @@ * @author smadappa */ public interface EVCache { - + // TODO: Remove Async methods (Project rx) and rename COMPLETABLE_* with ASYNC_* public static enum Call { GET, GETL, GET_AND_TOUCH, ASYNC_GET, BULK, SET, DELETE, INCR, DECR, TOUCH, APPEND, PREPEND, REPLACE, ADD, APPEND_OR_ADD, GET_ALL, META_GET, META_SET, META_DEBUG, COMPLETABLE_FUTURE_GET, COMPLETABLE_FUTURE_GET_BULK @@ -475,13 +471,32 @@ EVCacheLatch replace(String key, T value, Transcoder tc, int timeToLive, * any more requests or issues during deserialization or any IO * Related issues * - * Note: If the data is replicated by zone, then we can the + * Note: If the data is replicated by zone, then we can get the * value from the zone local to the client. If we cannot find * this value then null is returned. This is transparent to the * users. */ T get(String key) throws EVCacheException; + /** + * Async Retrieve the value for the given key. + * + * @param key + * key to get. Ensure the key is properly encoded and does not + * contain whitespace or control characters. The max length of the key (including prefix) + * is 250 characters. + * @return the Value for the given key from the cache (null if there is + * none). + * @throws EVCacheException + * in the rare circumstance where queue is too full to accept + * any more requests or issues during deserialization or any IO + * Related issues + * + * Note: If the data is replicated by zone, then we can get the + * value from the zone local to the client. If we cannot find + * this value then null is returned. This is transparent to the + * users. + */ CompletableFuture getAsync(String key) throws EVCacheException; /** * Retrieve the value for the given key. @@ -514,7 +529,7 @@ EVCacheLatch replace(String key, T value, Transcoder tc, int timeToLive, * any more requests or issues during deserialization or any IO * Related issues * - * Note: If the data is replicated by zone, then we can the + * Note: If the data is replicated by zone, then we can get the * value from the zone local to the client. If we cannot find * this value then null is returned. This is transparent to the * users. @@ -522,7 +537,7 @@ EVCacheLatch replace(String key, T value, Transcoder tc, int timeToLive, T get(String key, Transcoder tc) throws EVCacheException; /** - * Retrieve the value for the given a key using the specified Transcoder for + * Async Retrieve the value for the given a key using the specified Transcoder for * deserialization. * * @param key @@ -538,7 +553,7 @@ EVCacheLatch replace(String key, T value, Transcoder tc, int timeToLive, * any more requests or issues during deserialization or any IO * Related issues * - * Note: If the data is replicated by zone, then we can the + * Note: If the data is replicated by zone, then we can get the * value from the zone local to the client. If we cannot find * this value then null is returned. This is transparent to the * users. @@ -585,7 +600,7 @@ default EVCacheItemMetaData metaDebug(String key) throws EVCacheException { * any more requests or issues during deserialization or any IO * Related issues * - * Note: If the data is replicated by zone, then we can the + * Note: If the data is replicated by zone, then we can get the * value from the zone local to the client. If we cannot find * this value we retry other zones, if still not found, then null is returned. */ @@ -614,7 +629,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * any more requests or issues during deserialization or any IO * Related issues * - * Note: If the data is replicated by zone, then we can the + * Note: If the data is replicated by zone, then we can get the * value from the zone local to the client. If we cannot find * this value then null is returned. This is transparent to the * users. @@ -628,7 +643,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param key * key to get. Ensure the key is properly encoded and does not * contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param tc * the Transcoder to deserialize the data * @param scheduler @@ -645,7 +660,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param key * key to get. Ensure the key is properly encoded and does not * contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param timeToLive * the new expiration of this object i.e. less than 30 days in * seconds or the exact expiry time as UNIX time @@ -663,7 +678,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param key * key to get. Ensure the key is properly encoded and does not * contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param timeToLive * the new expiration of this object i.e. less than 30 days in * seconds or the exact expiry time as UNIX time @@ -684,7 +699,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param key * the key to get. Ensure the key is properly encoded and does * not contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param timeToLive * the new expiration of this object i.e. less than 30 days in * seconds or the exact expiry time as UNIX time @@ -702,7 +717,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param key * the key to get. Ensure the key is properly encoded and does * not contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param timeToLive * the new expiration of this object i.e. less than 30 days in * seconds or the exact expiry time as UNIX time @@ -722,7 +737,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param keys * the keys for which we need the values. Ensure each key is properly encoded and does * not contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @return a map of the values (for each value that exists). If the Returned * map contains the key but the value in null then the key does not * exist in the cache. if a key is missing then we were not able to @@ -734,7 +749,23 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE */ Map getBulk(String... keys) throws EVCacheException; - CompletableFuture> getBulkCompletableFuture(String... keys) throws EVCacheException, ExecutionException, InterruptedException; + /** + * Async Retrieve the value of a set of keys. + * + * @param keys + * the keys for which we need the values. Ensure each key is properly encoded and does + * not contain whitespace or control characters. The max length of the key (including prefix) + * is 200 characters. + * @return a map of the values (for each value that exists). If the Returned + * map contains the key but the value in null then the key does not + * exist in the cache. if a key is missing then we were not able to + * retrieve the data for that key due to some exception + * @throws EVCacheException + * in the rare circumstance where queue is too full to accept + * any more requests or issues during deserialization or any IO + * Related issues + */ + CompletableFuture> getAsyncBulk(String... keys); /** * Retrieve the value for a set of keys, using a specified Transcoder for @@ -743,7 +774,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param keys * keys to which we need the values.Ensure each key is properly encoded and does * not contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param tc * the transcoder to use for deserialization * @return a map of the values (for each value that exists). If the Returned @@ -764,7 +795,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param keys * The collection of keys for which we need the values. Ensure each key is properly encoded and does * not contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @return a map of the values (for each value that exists). If the Returned * map contains the key but the value in null then the key does not * exist in the cache. if a key is missing then we were not able to @@ -783,7 +814,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param keys * The collection of keys for which we need the values. Ensure each key is properly encoded and does * not contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param tc * the transcoder to use for deserialization * @return a map of the values (for each value that exists). If the Returned @@ -804,7 +835,7 @@ default EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheE * @param keys * The collection of keys for which we need the values. Ensure each key is properly encoded and does * not contain whitespace or control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param tc * the transcoder to use for deserialization * @param timeToLive @@ -830,7 +861,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key for which we need the value. Ensure the key is * properly encoded and does not contain whitespace or control * characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @return the Futures containing the Value or null. * @throws EVCacheException * in the circumstance where queue is too full to accept any @@ -850,7 +881,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key for which we need the value. Ensure the key is * properly encoded and does not contain whitespace or control * characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param tc * the transcoder to use for deserialization * @return the Futures containing the Value or null. @@ -871,7 +902,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key. Ensure the key is * properly encoded and does not contain whitespace or control * characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param by * the amount to increment * @param def @@ -893,7 +924,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key. Ensure the key is * properly encoded and does not contain whitespace or control * characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param by * the amount to decrement * @param def @@ -917,7 +948,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key under which this object should be appended. Ensure the * key is properly encoded and does not contain whitespace or * control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param T * the value to be appended * @param tc @@ -944,7 +975,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key under which this object should be appended. Ensure the * key is properly encoded and does not contain whitespace or * control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param T * the value to be appended * @param timeToLive @@ -969,7 +1000,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key which this object should be added to. Ensure the * key is properly encoded and does not contain whitespace or * control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param T * the value to be added * @param tc @@ -997,7 +1028,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key which this object should be added to. Ensure the * key is properly encoded and does not contain whitespace or * control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param T * the value to be added * @param tc @@ -1029,7 +1060,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key to touch. Ensure the * key is properly encoded and does not contain whitespace or * control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param ttl * the new expiration time in seconds * @@ -1050,7 +1081,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key to touch. Ensure the * key is properly encoded and does not contain whitespace or * control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param ttl * the new expiration time in seconds * @@ -1078,7 +1109,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key under which this object should be appended or Added. Ensure the * key is properly encoded and does not contain whitespace or * control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param T * the value to be appended * @param tc @@ -1105,7 +1136,7 @@ Map getBulkAndTouch(Collection keys, Transcoder tc, in * the key under which this object should be appended or Added. Ensure the * key is properly encoded and does not contain whitespace or * control characters. The max length of the key (including prefix) - * is 250 characters. + * is 200 characters. * @param T * the value to be appended * @param tc diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index 73803fec..f59cf730 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -13,14 +13,22 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import javax.management.MBeanServer; import javax.management.ObjectName; import com.netflix.evcache.dto.KeyMapDto; -import com.netflix.evcache.util.*; +import com.netflix.evcache.util.EVCacheBulkDataDto; +import com.netflix.evcache.util.KeyHasher; +import com.netflix.evcache.util.RetryCount; +import com.netflix.evcache.util.Sneaky; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -536,10 +544,10 @@ private T getInMemory(EVCacheKey evcKey, Transcoder tc) throws Exception final Transcoder transcoder = (tc == null) ? ((_transcoder == null) ? (Transcoder) _pool.getEVCacheClientForRead().getTranscoder() : (Transcoder) _transcoder) : tc; T value = getInMemoryCache(transcoder).get(evcKey); if (value != null) { - if (log.isDebugEnabled() && shouldLog()) log.debug("Value retrieved from inmemory cache for APP " + _appName + ", key : " + evcKey + (log.isTraceEnabled() ? "; value : " + value : "")); + if (log.isDebugEnabled() && shouldLog()) log.debug("Value retrieved from in-memory cache for APP " + _appName + ", key : " + evcKey + (log.isTraceEnabled() ? "; value : " + value : "")); return value; } else { - if (log.isInfoEnabled() && shouldLog()) log.info("Value not_found in inmemory cache for APP " + _appName + ", key : " + evcKey + "; value : " + value ); + if (log.isInfoEnabled() && shouldLog()) log.info("Value not_found in in-memory cache for APP " + _appName + ", key : " + evcKey + "; value : " + value ); } } catch (Exception e) { return handleInMemoryException(e); @@ -551,6 +559,9 @@ private T getInMemory(EVCacheKey evcKey, Transcoder tc) throws Exception private CompletableFuture getAsyncInMemory(EVCacheKey evcKey, Transcoder tc) { CompletableFuture promise = new CompletableFuture<>(); try { + if(log.isDebugEnabled() && shouldLog()) { + log.debug("Retrieving value from memory {} ", evcKey.getKey()); + } T t = getInMemory(evcKey, tc); promise.complete(t); } catch (Exception ex) { @@ -563,6 +574,7 @@ private T handleInMemoryException(Exception e) throws Exception { final boolean throwExc = doThrowException(); if(throwExc) { if(e.getCause() instanceof DataNotFoundException) { + if (log.isDebugEnabled() && shouldLog()) log.debug("DataNotFoundException while getting data from InMemory Cache", e); return null; } if(e.getCause() instanceof EVCacheException) { @@ -572,6 +584,7 @@ private T handleInMemoryException(Exception e) throws Exception { throw new EVCacheException("ExecutionException", e); } } else { + if (log.isDebugEnabled() && shouldLog()) log.debug("Throws Exception is false and returning null in this case"); return null; } } @@ -583,13 +596,20 @@ private CompletableFuture doAsyncGet(EVCacheKey evcKey, Transcoder tc) EVCacheClient client = buildEvCacheClient(throwExc, Call.COMPLETABLE_FUTURE_GET, errorFuture); if (errorFuture.isCompletedExceptionally() || client == null) { if (client == null ) { + if (log.isDebugEnabled() && shouldLog()) log.debug("client is null"); errorFuture.complete(null); } return errorFuture; } + if (log.isDebugEnabled() && shouldLog()) log.debug("Completed Building the client"); //Building the start event - EVCacheEvent event = buildAndStartEvent(client, Collections.singletonList(evcKey), throwExc, errorFuture); + EVCacheEvent event = buildAndStartEvent(client, + Collections.singletonList(evcKey), + throwExc, + errorFuture, + Call.COMPLETABLE_FUTURE_GET); if (errorFuture.isCompletedExceptionally()) { + if (log.isDebugEnabled() && shouldLog()) log.debug("Error while building and starting the event"); return errorFuture; } errorFuture.cancel(false); @@ -631,14 +651,15 @@ private EVCacheClient buildEvCacheClient(boolean throwExc, Call callType, Co } private EVCacheEvent buildAndStartEvent(EVCacheClient client, - List evcKeys, - boolean throwExc, - CompletableFuture completableFuture) { - EVCacheEvent event = createEVCacheEvent(Collections.singletonList(client), Call.COMPLETABLE_FUTURE_GET); + List evcKeys, + boolean throwExc, + CompletableFuture completableFuture, + Call callType) { + EVCacheEvent event = createEVCacheEvent(Collections.singletonList(client), callType); if (event != null) { event.setEVCacheKeys(evcKeys); if (shouldThrottle(event)) { - incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.COMPLETABLE_FUTURE_GET); + incrementFastFail(EVCacheMetricsFactory.THROTTLED, callType); if (throwExc) completableFuture.completeExceptionally(new EVCacheException("Request Throttled for app " + _appName + " & keys " + evcKeys)); return null; @@ -698,11 +719,20 @@ private T handleFinally(T data, private void handleException(Throwable ex, EVCacheEvent event) { if (ex.getCause() instanceof RuntimeException) { + if (log.isDebugEnabled() && shouldLog()) { + log.debug("Handling exception with cause ", ex.getCause()); + } Throwable runTimeCause = ex.getCause(); if (runTimeCause.getCause() instanceof ExecutionException) { + if (log.isDebugEnabled() && shouldLog()) { + log.debug("Handling ExecutionException with cause ",runTimeCause.getCause()); + } Throwable executionExceptionCause = runTimeCause.getCause(); if (executionExceptionCause.getCause() instanceof net.spy.memcached.internal.CheckedOperationTimeoutException) { if (event != null) { + if (log.isDebugEnabled() && shouldLog()) { + log.debug("Setting Status as Timeout"); + } event.setStatus(EVCacheMetricsFactory.TIMEOUT); eventError(event, ex); } @@ -711,6 +741,9 @@ private void handleException(Throwable ex, EVCacheEvent event) { } } if (event != null) { + if (log.isDebugEnabled() && shouldLog()) { + log.debug("Setting event as Error"); + } event.setStatus(EVCacheMetricsFactory.ERROR); eventError(event, ex); } @@ -761,7 +794,9 @@ private CompletableFuture handleRetries(List fbClients, } public CompletableFuture handleSuccessCompletion(T s, EVCacheKey key, List fbClients, int index, RetryCount retryCount) { - log.debug("fetched the key {} from server {} and retry count {}", key.getKey(), fbClients.get(index).getServerGroup().getName(), retryCount.get()); + if (log.isDebugEnabled() && shouldLog()) { + log.debug("fetched the key {} from server {} and retry count {}", key.getKey(), fbClients.get(index).getServerGroup().getName(), retryCount.get()); + } return CompletableFuture.completedFuture(s); } @@ -1239,11 +1274,17 @@ private CompletableFuture getAsyncData(EVCacheClient client, String canonicalKey = evcKey.getCanonicalKey(client.isDuetClient()); if (hashKey != null) { + if (log.isDebugEnabled() && shouldLog()) { + log.debug("Fetching data with hashKey {} ", hashKey); + } return client.getAsync(hashKey, evcacheValueTranscoder) .thenApply(val -> getData(transcoder, canonicalKey, val)) .exceptionally(ex -> handleClientException(hashKey, ex)); } else { + if (log.isDebugEnabled() && shouldLog()) { + log.debug("Fetching data with canonicalKey {} ", canonicalKey); + } return client.getAsync(canonicalKey, transcoder) .exceptionally(ex -> handleClientException(canonicalKey, ex)); } @@ -1860,6 +1901,9 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient final Map keyMap = keyMapDto.getKeyMap(); boolean hasHashedKey = keyMapDto.isKeyHashed(); if (hasHashedKey) { + if (log.isDebugEnabled() && shouldLog()) { + log.debug("fetching bulk data with hashedKey {} ",evcacheKeys); + } return client.getAsyncBulk(keyMap.keySet(), evcacheValueTranscoder) .thenApply(data -> buildHashedKeyValueResult(data, tc, client, keyMap)) .exceptionally(t -> handleBulkException(t, evcacheKeys)); @@ -1870,6 +1914,9 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient } else { tcCopy = tc; } + if (log.isDebugEnabled() && shouldLog()) { + log.debug("fetching bulk data with non hashedKey {} ",keyMap.keySet()); + } return client.getAsyncBulk(keyMap.keySet(), tcCopy ) .thenApply(data -> buildNonHashedKeyValueResult(data, keyMap)) .exceptionally(t -> handleBulkException(t, evcacheKeys)); @@ -2258,6 +2305,7 @@ private Map getBulk(final Collection keys, Transcoder } private CompletableFuture> handleBulkInMemory(Collection keys, Transcoder tc) { + if (log.isDebugEnabled() && shouldLog()) log.debug("handleBulkInMemory with keys {} " + keys); final Map decanonicalR = new HashMap<>((keys.size() * 4) / 3 + 1); final List evcKeys = new ArrayList<>(); CompletableFuture> promise = new CompletableFuture<>(); @@ -2283,17 +2331,18 @@ private EVCacheBulkDataDto handleBulkInMemory(Collection keys, log.debug("Value retrieved from inmemory cache for APP " + _appName + ", key : " + evcKey + (log.isTraceEnabled() ? "; value : " + value : "")); } else { + if (log.isDebugEnabled() && shouldLog()) log.debug("Key not present in in memory {} " + k); evcKeys.add(evcKey); } } return new EVCacheBulkDataDto<>(decanonicalR, evcKeys); } - public CompletableFuture> getBulkCompletableFuture(String... keys) throws ExecutionException, InterruptedException { - return this.getBulkCompletableFuture(Arrays.asList(keys), (Transcoder) _transcoder); + public CompletableFuture> getAsyncBulk(String... keys) { + return this.getAsyncBulk(Arrays.asList(keys), (Transcoder) _transcoder); } - private CompletableFuture> getBulkCompletableFuture(final Collection keys, Transcoder tc) throws ExecutionException, InterruptedException { + private CompletableFuture> getAsyncBulk(final Collection keys, Transcoder tc) { if (null == keys) throw new IllegalArgumentException(); if (keys.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap()); return handleBulkInMemory(keys, tc) @@ -2305,17 +2354,29 @@ private CompletableFuture> doAsyncGetBulk(Collection EVCacheBulkDataDto dto) { // all keys handled by in memory if(dto.getEvcKeys().size() == 0 && dto.getDecanonicalR().size() == keys.size()) { - if (log.isDebugEnabled() && shouldLog()) log.debug("All Values retrieved from inmemory cache for APP " + _appName + ", keys : " + keys); + if (log.isDebugEnabled() && shouldLog()) log.debug("All Values retrieved from in-memory cache for APP " + _appName + ", keys : " + keys); return CompletableFuture.completedFuture(dto.getDecanonicalR()); } - final boolean throwExc = doThrowException(); - EVCacheClient client = _pool.getEVCacheClientForRead(); - if (client == null) { - return null; + CompletableFuture> errorFuture = new CompletableFuture<>(); + EVCacheClient client = buildEvCacheClient(throwExc, Call.COMPLETABLE_FUTURE_GET_BULK, errorFuture); + if (errorFuture.isCompletedExceptionally() || client == null) { + if (client == null ) { + if (log.isDebugEnabled() && shouldLog()) log.debug("doAsyncGetBulk is null"); + errorFuture.complete(null); + } + return errorFuture; + } + if (log.isDebugEnabled() && shouldLog()) log.debug("Completed Building the client for doAsyncGetBulk"); + //Building the start event + EVCacheEvent event = buildAndStartEvent(client, dto.getEvcKeys(), throwExc, errorFuture, Call.COMPLETABLE_FUTURE_GET_BULK); + if (errorFuture.isCompletedExceptionally()) { + if (log.isDebugEnabled() && shouldLog()) log.debug("Error while building and starting the event for doAsyncGetBulk"); + return errorFuture; } + if (log.isDebugEnabled() && shouldLog()) log.debug("Cancelling the error future"); + errorFuture.cancel(false); - final EVCacheEvent event = null; final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); StringBuilder status = new StringBuilder(EVCacheMetricsFactory.SUCCESS); StringBuilder cacheOperation = new StringBuilder(EVCacheMetricsFactory.YES); @@ -2422,6 +2483,9 @@ private CompletableFuture> handleFullRetry(EVCacheClient Transcoder tc, RetryCount retryCount) { final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); + if (log.isInfoEnabled() && shouldLog()) { + log.info("Fetching the clients for retry {}", fbClients); + } return handleFullBulkRetries(fbClients, 0, event, evcKeys, tc, retryCount); } @@ -2432,8 +2496,15 @@ private CompletableFuture> handleFullBulkRetries(List tc, RetryCount retryCount) { if (fbClientIndex >= fbClients.size()) { + if (log.isInfoEnabled() && shouldLog()) { + log.debug("Clients exhausted so returning the future with null result for keys {}", evcKeys); + } return CompletableFuture.completedFuture(null); } + if (log.isInfoEnabled() && shouldLog()) { + EVCacheClient evCacheClient = fbClients.get(fbClientIndex); + log.debug("Trying to fetching the data from server group {} client {} and keys {}", evCacheClient.getServerGroupName(), evCacheClient.getId(), evcKeys); + } CompletableFuture> future = getAsyncBulkData(fbClients.get(fbClientIndex), event, evcKeys, tc); int nextIndex = fbClientIndex + 1; retryCount.incr(); @@ -2452,8 +2523,17 @@ private CompletableFuture> handleBulkRetry(Map CompletableFuture getNext(CompletableFuture future, scheduledTimeout = LazySharedExecutor.executor.schedule( () -> { + if(log.isDebugEnabled()) log.debug("Throwing timeout exception after {} {}", timeout, unit); future.completeExceptionally(new TimeoutException("Timeout after " + timeout)); }, splitTimeout, @@ -293,7 +294,7 @@ private static CompletableFuture getNext(CompletableFuture future, (r, exp) -> { if (exp == null) { scheduledTimeout.cancel(false); - log.debug("completing the future"); + if(log.isDebugEnabled()) log.debug("completing the future"); next.complete(null); } }); @@ -306,15 +307,19 @@ public CompletableFuture makeFutureWithTimeout(long timeout, TimeUnit uni return withTimeout(future, timeout, units); } - private CompletableFuture handleTimeoutException() { + private void handleTimeoutException() { + if(log.isDebugEnabled()) log.debug("handling the timeout in handleTimeoutException"); MemcachedConnection.opTimedOut(op); if (op != null) op.timeOut(); ExecutionException t = null; if (op.isTimedOut()) { + if(log.isDebugEnabled()) log.debug("Checked Operation timed out with operation {}.", op); t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); } else if (op.isCancelled()) { + if(log.isDebugEnabled()) log.debug("Cancelled with operation {}.", op); t = new ExecutionException(new CancellationException("Cancelled")); } else if (op.hasErrored() ) { + if(log.isDebugEnabled()) log.debug("Other exception with operation {}.", op); t = new ExecutionException(op.getException()); } throw new RuntimeException(t); @@ -331,7 +336,7 @@ public CompletableFuture getAsync(long timeout, TimeUnit units) { }); } - private EVCacheGetOperationListener doAsyncGet(CompletableFuture cf) { + private void doAsyncGet(CompletableFuture cf) { EVCacheGetOperationListener listener = future -> { try { T result = future.get(); @@ -341,7 +346,6 @@ private EVCacheGetOperationListener doAsyncGet(CompletableFuture cf) { } }; this.addListener(listener); - return listener; } public Single get(long duration, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) { diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index 4f0b89d6..690f6cdc 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -862,9 +862,10 @@ public long decr(String key, long by, long defaultVal, int timeToLive) throws EV } public CompletableFuture getAsync(String key, Transcoder tc) { - return evcacheMemcachedClient - .asyncGet(key, tc, null) - .getAsync(readTimeout.get(), TimeUnit.MILLISECONDS); + if(log.isDebugEnabled()) log.debug("fetching data getAsync {}", key); + return evcacheMemcachedClient + .asyncGet(key, tc, null) + .getAsync(readTimeout.get(), TimeUnit.MILLISECONDS); } public T get(String key, Transcoder tc, boolean _throwException, boolean hasZF, boolean chunked) throws Exception { @@ -985,11 +986,11 @@ public Map getBulk(Collection _canonicalKeys, Transcoder< } public CompletableFuture> getAsyncBulk(Collection _canonicalKeys, Transcoder tc) { - final Collection canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.BULK); + final Collection canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.COMPLETABLE_FUTURE_GET_BULK); if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient .asyncGetBulk(canonicalKeys, tc, null) - .getAsyncSome(5, TimeUnit.MILLISECONDS); + .getAsyncSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS); } diff --git a/evcache-core/src/test/java/com/netflix/evcache/test/Base.java b/evcache-core/src/test/java/com/netflix/evcache/test/Base.java index f1d2256c..39949c5c 100644 --- a/evcache-core/src/test/java/com/netflix/evcache/test/Base.java +++ b/evcache-core/src/test/java/com/netflix/evcache/test/Base.java @@ -256,8 +256,8 @@ public Map getBulk(String keys[], EVCache gCache) throws Excepti return value; } - public Map getCompletableBulk(String keys[], EVCache gCache) throws Exception { - final CompletableFuture> value = gCache.getBulkCompletableFuture(keys); + public Map getAsyncBulk(String keys[], EVCache gCache) throws Exception { + final CompletableFuture> value = gCache.getAsyncBulk(keys); if(log.isDebugEnabled()) log.debug("getBulk : keys : " + Arrays.toString(keys) + "; values = " + value); return value.get(); } diff --git a/evcache-core/src/test/java/com/netflix/evcache/test/SimpleEVCacheTest.java b/evcache-core/src/test/java/com/netflix/evcache/test/SimpleEVCacheTest.java index 4d450784..1fcc736a 100644 --- a/evcache-core/src/test/java/com/netflix/evcache/test/SimpleEVCacheTest.java +++ b/evcache-core/src/test/java/com/netflix/evcache/test/SimpleEVCacheTest.java @@ -228,7 +228,7 @@ public void testCompletableFutureBulk() throws Exception { for (int i = 0; i < keys.length; i++) { keys[i] = "key_" + i; } - Map vals = getCompletableBulk(keys, evCache); + Map vals = getAsyncBulk(keys, evCache); assertTrue(!vals.isEmpty()); for (int i = 0; i < vals.size(); i++) { String key = "key_" + i;