-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add result level caching to Brokers #5028
Conversation
@@ -37,6 +37,8 @@ | |||
public static final boolean DEFAULT_BY_SEGMENT = false; | |||
public static final boolean DEFAULT_POPULATE_CACHE = true; | |||
public static final boolean DEFAULT_USE_CACHE = true; | |||
public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New feature should start off disabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be sufficient to keep it disabled via the populateResultCache parameter?
ba78816#diff-661a9dbd25633c651845a9ecab0ddbaaR44
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is confusing to have default here be true
but the default in CacheConfig
be false. But this just follows the convention of the prior caching settings.
Can you add a comment in CacheConfig
that the defaults as stated in QueryContexts
are different due to legacy reasons, and should probably be made the same at some point in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment as suggested.
cool! Any chance you can get empirical evidence on the hit rates for this feature? |
} | ||
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache(); | ||
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz(); | ||
Sequence<Object> cachedSequence = new BaseSequence<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be Sequences.simple(() -> {...lambda, returning iterator...})
List<Sequence<T>> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size()); | ||
addSequencesFromCache(sequencesByInterval, alreadyCachedResults); | ||
addSequencesFromServer(sequencesByInterval, segmentsByServer); | ||
return Sequences | ||
.simple(sequencesByInterval) | ||
.flatMerge(seq -> seq, query.getResultOrdering()); | ||
}).map(r -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refactor as a separate method
) | ||
{ | ||
return new Cache.NamedKey( | ||
resultLevelCacheIdentifier, resultLevelCacheIdentifier.getBytes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StringUtils.toUtf8
instead of getBytes
is suggested
@Nullable | ||
final byte[] queryCacheKey = computeQueryCacheKey(); | ||
@Nullable | ||
final String queryResultKey = computeCurrentEtag(segments, queryCacheKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now computes the etag for every query regardless of if caching is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this only be computed if it will actually be used?
@Nullable | ||
final byte[] cachedResultSet = fetchFromResultLevelCache(queryResultKey); | ||
if (cachedResultSet != null) { | ||
log.info("Fetching entire result set from cache"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be debug, not info. That's a lot of logging to have on every query.
if (currentEtag != null && currentEtag.equals(prevEtag)) { | ||
return Sequences.empty(); | ||
} | ||
} | ||
|
||
@Nullable | ||
final byte[] cachedResultSet = fetchFromResultLevelCache(queryResultKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should only happen if the using the results cache is enabled.
final List<Pair<Interval, byte[]>> alreadyCachedResults = pruneSegmentsWithCachedResults(queryCacheKey, segments); | ||
final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segments); | ||
return new LazySequence<>(() -> { | ||
return Sequences.wrap(new LazySequence<>(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this impact the lazy evaluation? Where is the cache work done? in the jetty thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lazy evaluation would continue to work as it is. The wrapped after() method would run after all the results have been accumulated and populates the cache in the jetty thread.
}); | ||
} | ||
|
||
private T cacheResultEntry(T result, String queryResultKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as per the other methods, this should return null immediately if queryResultKey is null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resultcachepopulator entry will be created only if the result cache is enabled and this method will add future entries only if the populator entry is present. This method would be harmless if resultcache is disabled.
); | ||
} | ||
catch (IOException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failing to fetch or parse from cache should NOT cause a complete failure of the query. It should continue on and do what it can to compute things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not completely clear where this will be evaluated, and if the exception is caught and ignored or not.
@@ -66,7 +75,9 @@ public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key | |||
gen.writeObject(result); | |||
} | |||
} | |||
|
|||
if (cacheLimit != 0 && bytes.size() > cacheLimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be cacheLimit > 0
and pass in -1 when you don't want to do it?
@@ -181,7 +181,7 @@ public void run() | |||
public void run() | |||
{ | |||
try { | |||
CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get()); | |||
CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get(), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as per above, suggest making this -1
@@ -46,6 +52,9 @@ | |||
private int cacheBulkMergeLimit = Integer.MAX_VALUE; | |||
|
|||
@JsonProperty | |||
private int resultLevelCacheLimit = 10485760; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not Integer.MAX_VALUE
by default?
@@ -351,7 +351,8 @@ private void testUseCache( | |||
cache, | |||
objectMapper, | |||
cacheKey, | |||
Iterables.transform(expectedResults, cacheStrategy.prepareForCache()) | |||
Iterables.transform(expectedResults, cacheStrategy.prepareForCache()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as per above, suggest -1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
This is a neat feature but I don't know how much battle testing the etag computation code paths have. Some of the underlying code looks racy in ways that can cause hash computation failures. As such, since this is also a new feature, I suggest making sure the code paths will not touch any new code (or will immediately |
The racy part I'm not sure about is that the etag computation uses |
Also please make sure there's a coherent story on what thread is doing computation. It is a notoriously tricky thing to track, and cache populating is actually a very compute intensive operation. |
@drcrallen I'm currently testing the changes on one of our internal clusters and should be able to give you a better response once it is done. I have marked this PR in progress for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should do result level caching business at the very end of query execution by adding a ResultLevelCachingQueryRunner that is added at very top of the query runner chain as in #4852 for setting up context.
Major reason is to have clean separation from existing classes. Also I'm not sure whether current implementation caches raw sketches or finalized values ... it should definitely cache finalized values if not already.
docs/content/configuration/broker.md
Outdated
@@ -104,6 +104,9 @@ You can optionally only configure caching to be enabled on the broker by setting | |||
|--------|---------------|-----------|-------| | |||
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false| | |||
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false| | |||
|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false| | |||
|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false| | |||
|`druid.broker.cache.resultLevelCacheLimit`|positive integer or 0|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would be the meaning of setting it 0 ? that would disable the cache.
@@ -375,6 +423,7 @@ private String computeCurrentEtag(final Set<ServerToSegment> segments, @Nullable | |||
Hasher hasher = Hashing.sha1().newHasher(); | |||
boolean hasOnlyHistoricalSegments = true; | |||
for (ServerToSegment p : segments) { | |||
log.info(p.getServer().pick().getServer().getType().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like left here by accident.
@drcrallen current etag feature is being used in some druid clusters at Oath and some users have result level caches built using the etag feature outside of Druid already. Result level Cache hit ratio typically varies from 50% to 80% depending upon the use case. Regarding thread used for computation, I think we should ensure that cache population activity happens inside the thread that calls ResultLevelCachingQueryRunner.run(..) (I think its gonna be the jetty thread) which should be introduced instead of changing CachingClientQueryRunner too much. |
if (newResultCacheKeyFromEtag != null && newResultCacheKeyFromEtag.equals(prevResultCacheKeyFromEtag)) { | ||
return Sequences.empty(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this kind of short circuiting is already done by checking QueryResource.HEADER_IF_NONE_MATCH
in the query context can we reuse same and not add this block ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed as per suggestion.
@drcrallen could you please review |
Yes, I'll put it on my list. |
@@ -15,6 +15,8 @@ The query context is used for various query configuration parameters. The follow | |||
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | | |||
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache | | |||
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache or druid.historical.cache.populateCache to determine whether or not to save the results of this query to the query cache | | |||
|useResultLevelCache | `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useResultLevelCache to determine whether or not to read from the query cache | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as per below, I thought this defaulted to false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the docs.
"ResultLevelCachePopulator cannot be null during cache population" | ||
); | ||
if (thrown != null) { | ||
log.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) why not put the error as a parameter to error(
?
} | ||
} | ||
catch (IOException ex) { | ||
log.error("Failed to retrieve entry to be cached. Result Level caching will not be performed!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably log error here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aka, include the exception in the error parameters for the logger call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good! the main thing i would like to see addressed is if there's a way to make the result level caching call fit in with the fluent query runner. The switch from fluent style to delegation style is kind of jarring.
baseClientRunner, | ||
retryConfig, | ||
objectMapper | ||
return new ResultLevelCachingQueryRunner<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels like it violates the fluent workflow, is there a way to make this work inline with the fluent query runner?
* @return A function that does the inverse of the operation that the function prepareForCache returns | ||
*/ | ||
Function<CacheType, T> pullFromCache(); | ||
Function<CacheType, T> pullFromCache(boolean isResultLevelCache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since a lot of calls default to false
, would it make sense to add another method that just calls pullFromCache(false)
and prepareForCache(false)
? and preserve backwards compat?
@@ -91,37 +91,42 @@ public ClientQuerySegmentWalker( | |||
private <T> QueryRunner<T> makeRunner(Query<T> query, QueryRunner<T> baseClientRunner) | |||
{ | |||
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query); | |||
return new ResultLevelCachingQueryRunner<>(makeRunner(query, baseClientRunner, toolChest), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@drcrallen Does this look ok? I have refactored it a bit but it doesn't exactly follow the fluent style.
Incorporating ResultLevelCachingQueryRunner inside the FluentQueryRunnerBuilder is tricky because the query runner logic needs to keep track of cache value data before and after the baseRunner.run is invoked. Further this query runner is not accessible inside the fluent query runner builder, which makes it a bit more complex.
I have addressed your other comments as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
io.druid.query.FluentQueryRunnerBuilder.FluentQueryRunner#emitCPUTimeMetric
use in the fluent query runner builder has a similar need. If you make a method in the fluent builder that takes the missing parameters (query, objectmapper, cache, cacheconfig) does it work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@drcrallen Tried that but FluentQueryRunnerBuilder
which resides in druid-processing
does not have access to ResultLevelCachingQueryRunner
as it (along with most of the caching logic) resides in druid-server
package.
I attempted to refactor the caching logic into druid-processing, but there are several other dependencies that may have to be moved. I could work on a follow up improvement PR to investigate and perform the refactoring and hopefully make the makeRunner
method cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That must be quite frustrating >.<
Can you clarify this in an issue, and put a link to the issue in a comment above this code.
That way anyone coming in later and wondering why it is like this can have a clear logic path for what needs to change to fix things up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@drcrallen I've made the requested changes.
Sweet! @a2l007 thanks for sticking this out. |
@@ -426,6 +427,11 @@ public Object apply(Row input) | |||
for (AggregatorFactory agg : aggs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + dims.size() + aggs.size());
a few lines above is not updated wrt getPostAggregatorSpecs. Similar problems in other classes, updated in this PR. Also I noticed that somewhere this sizing was buggy before this PR already.
Based on proposal #4843 , this introduces result level caching in brokers. It uses the etag functionality to identify the existence of result cache for a specific query. This is independent of the segment level caching and therefore both types of caching can be configured independently. A new query runner: ResultLevelCachingQueryRunner is introduced that performs this caching. The result level cache is populated after the merge using the query key as the cache key and merged result is saved as the cache value along with the etag information.
Since query results may be large, there is a configurable parameter
resultLevelCacheLimit
that limits the size of the query response that can be cached.One caveat to be noted is that basic object deserialization is performed while retrieving post aggregated values from the cache. Therefore this may not work with post aggregators that require a custom deserialization method.