Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add result level caching to Brokers #5028

Merged
merged 42 commits into from
Mar 24, 2018
Merged

Conversation

a2l007
Copy link
Contributor

@a2l007 a2l007 commented Oct 31, 2017

Based on proposal #4843 , this introduces result level caching in brokers. It uses the etag functionality to identify the existence of result cache for a specific query. This is independent of the segment level caching and therefore both types of caching can be configured independently. A new query runner: ResultLevelCachingQueryRunner is introduced that performs this caching. The result level cache is populated after the merge using the query key as the cache key and merged result is saved as the cache value along with the etag information.
Since query results may be large, there is a configurable parameter resultLevelCacheLimit that limits the size of the query response that can be cached.
One caveat to be noted is that basic object deserialization is performed while retrieving post aggregated values from the cache. Therefore this may not work with post aggregators that require a custom deserialization method.

@@ -37,6 +37,8 @@
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
public static final boolean DEFAULT_USE_CACHE = true;
public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New feature should start off disabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be sufficient to keep it disabled via the populateResultCache parameter?
ba78816#diff-661a9dbd25633c651845a9ecab0ddbaaR44

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is confusing to have default here be true but the default in CacheConfig be false. But this just follows the convention of the prior caching settings.

Can you add a comment in CacheConfig that the defaults as stated in QueryContexts are different due to legacy reasons, and should probably be made the same at some point in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment as suggested.

@drcrallen
Copy link
Contributor

cool! Any chance you can get empirical evidence on the hit rates for this feature?

}
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache();
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
Sequence<Object> cachedSequence = new BaseSequence<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be Sequences.simple(() -> {...lambda, returning iterator...})

List<Sequence<T>> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
addSequencesFromServer(sequencesByInterval, segmentsByServer);
return Sequences
.simple(sequencesByInterval)
.flatMerge(seq -> seq, query.getResultOrdering());
}).map(r -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refactor as a separate method

)
{
return new Cache.NamedKey(
resultLevelCacheIdentifier, resultLevelCacheIdentifier.getBytes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringUtils.toUtf8 instead of getBytes is suggested

@Nullable
final byte[] queryCacheKey = computeQueryCacheKey();
@Nullable
final String queryResultKey = computeCurrentEtag(segments, queryCacheKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now computes the etag for every query regardless of if caching is enabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this only be computed if it will actually be used?

@Nullable
final byte[] cachedResultSet = fetchFromResultLevelCache(queryResultKey);
if (cachedResultSet != null) {
log.info("Fetching entire result set from cache");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be debug, not info. That's a lot of logging to have on every query.

if (currentEtag != null && currentEtag.equals(prevEtag)) {
return Sequences.empty();
}
}

@Nullable
final byte[] cachedResultSet = fetchFromResultLevelCache(queryResultKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only happen if the using the results cache is enabled.

final List<Pair<Interval, byte[]>> alreadyCachedResults = pruneSegmentsWithCachedResults(queryCacheKey, segments);
final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segments);
return new LazySequence<>(() -> {
return Sequences.wrap(new LazySequence<>(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this impact the lazy evaluation? Where is the cache work done? in the jetty thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lazy evaluation would continue to work as it is. The wrapped after() method would run after all the results have been accumulated and populates the cache in the jetty thread.

});
}

private T cacheResultEntry(T result, String queryResultKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per the other methods, this should return null immediately if queryResultKey is null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resultcachepopulator entry will be created only if the result cache is enabled and this method will add future entries only if the populator entry is present. This method would be harmless if resultcache is disabled.

);
}
catch (IOException e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing to fetch or parse from cache should NOT cause a complete failure of the query. It should continue on and do what it can to compute things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not completely clear where this will be evaluated, and if the exception is caught and ignored or not.

@@ -66,7 +75,9 @@ public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key
gen.writeObject(result);
}
}

if (cacheLimit != 0 && bytes.size() > cacheLimit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be cacheLimit > 0 and pass in -1 when you don't want to do it?

@@ -181,7 +181,7 @@ public void run()
public void run()
{
try {
CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get());
CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get(), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per above, suggest making this -1

@@ -46,6 +52,9 @@
private int cacheBulkMergeLimit = Integer.MAX_VALUE;

@JsonProperty
private int resultLevelCacheLimit = 10485760;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not Integer.MAX_VALUE by default?

@@ -351,7 +351,8 @@ private void testUseCache(
cache,
objectMapper,
cacheKey,
Iterables.transform(expectedResults, cacheStrategy.prepareForCache())
Iterables.transform(expectedResults, cacheStrategy.prepareForCache()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per above, suggest -1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@drcrallen
Copy link
Contributor

This is a neat feature but I don't know how much battle testing the etag computation code paths have. Some of the underlying code looks racy in ways that can cause hash computation failures. As such, since this is also a new feature, I suggest making sure the code paths will not touch any new code (or will immediately return null) unless this feature is enabled.

@drcrallen
Copy link
Contributor

drcrallen commented Nov 3, 2017

The racy part I'm not sure about is that the etag computation uses ServerSelector's pick() method, which may have an underlying change in discovery state after the initial server selection. But fixing or changing that is beyond the scope of this PR.

@drcrallen
Copy link
Contributor

Also please make sure there's a coherent story on what thread is doing computation. It is a notoriously tricky thing to track, and cache populating is actually a very compute intensive operation.

@a2l007 a2l007 changed the title Add result level caching to Brokers [WIP] Add result level caching to Brokers Nov 3, 2017
@a2l007
Copy link
Contributor Author

a2l007 commented Nov 3, 2017

@drcrallen I'm currently testing the changes on one of our internal clusters and should be able to give you a better response once it is done. I have marked this PR in progress for now.

@drcrallen drcrallen added the WIP label Nov 3, 2017
Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do result level caching business at the very end of query execution by adding a ResultLevelCachingQueryRunner that is added at very top of the query runner chain as in #4852 for setting up context.
Major reason is to have clean separation from existing classes. Also I'm not sure whether current implementation caches raw sketches or finalized values ... it should definitely cache finalized values if not already.

@@ -104,6 +104,9 @@ You can optionally only configure caching to be enabled on the broker by setting
|--------|---------------|-----------|-------|
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false|
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false|
|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false|
|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false|
|`druid.broker.cache.resultLevelCacheLimit`|positive integer or 0|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would be the meaning of setting it 0 ? that would disable the cache.

@@ -375,6 +423,7 @@ private String computeCurrentEtag(final Set<ServerToSegment> segments, @Nullable
Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true;
for (ServerToSegment p : segments) {
log.info(p.getServer().pick().getServer().getType().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like left here by accident.

@himanshug
Copy link
Contributor

himanshug commented Nov 15, 2017

@drcrallen current etag feature is being used in some druid clusters at Oath and some users have result level caches built using the etag feature outside of Druid already. Result level Cache hit ratio typically varies from 50% to 80% depending upon the use case.
Another major advantage of result level cache is that you cache finalized value of sketches rather than sketches themselves which take alot of space if stored inside cache.

Regarding thread used for computation, I think we should ensure that cache population activity happens inside the thread that calls ResultLevelCachingQueryRunner.run(..) (I think its gonna be the jetty thread) which should be introduced instead of changing CachingClientQueryRunner too much.

if (newResultCacheKeyFromEtag != null && newResultCacheKeyFromEtag.equals(prevResultCacheKeyFromEtag)) {
return Sequences.empty();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this kind of short circuiting is already done by checking QueryResource.HEADER_IF_NONE_MATCH in the query context can we reuse same and not add this block ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed as per suggestion.

@a2l007
Copy link
Contributor Author

a2l007 commented Feb 15, 2018

@drcrallen could you please review

@drcrallen
Copy link
Contributor

Yes, I'll put it on my list.

@@ -15,6 +15,8 @@ The query context is used for various query configuration parameters. The follow
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache or druid.historical.cache.populateCache to determine whether or not to save the results of this query to the query cache |
|useResultLevelCache | `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useResultLevelCache to determine whether or not to read from the query cache |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per below, I thought this defaulted to false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the docs.

"ResultLevelCachePopulator cannot be null during cache population"
);
if (thrown != null) {
log.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) why not put the error as a parameter to error(?

}
}
catch (IOException ex) {
log.error("Failed to retrieve entry to be cached. Result Level caching will not be performed!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably log error here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aka, include the exception in the error parameters for the logger call

Copy link
Contributor

@drcrallen drcrallen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good! the main thing i would like to see addressed is if there's a way to make the result level caching call fit in with the fluent query runner. The switch from fluent style to delegation style is kind of jarring.

baseClientRunner,
retryConfig,
objectMapper
return new ResultLevelCachingQueryRunner<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels like it violates the fluent workflow, is there a way to make this work inline with the fluent query runner?

* @return A function that does the inverse of the operation that the function prepareForCache returns
*/
Function<CacheType, T> pullFromCache();
Function<CacheType, T> pullFromCache(boolean isResultLevelCache);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a lot of calls default to false, would it make sense to add another method that just calls pullFromCache(false) and prepareForCache(false) ? and preserve backwards compat?

@@ -91,37 +91,42 @@ public ClientQuerySegmentWalker(
private <T> QueryRunner<T> makeRunner(Query<T> query, QueryRunner<T> baseClientRunner)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
return new ResultLevelCachingQueryRunner<>(makeRunner(query, baseClientRunner, toolChest),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen Does this look ok? I have refactored it a bit but it doesn't exactly follow the fluent style.
Incorporating ResultLevelCachingQueryRunner inside the FluentQueryRunnerBuilder is tricky because the query runner logic needs to keep track of cache value data before and after the baseRunner.run is invoked. Further this query runner is not accessible inside the fluent query runner builder, which makes it a bit more complex.
I have addressed your other comments as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.druid.query.FluentQueryRunnerBuilder.FluentQueryRunner#emitCPUTimeMetric use in the fluent query runner builder has a similar need. If you make a method in the fluent builder that takes the missing parameters (query, objectmapper, cache, cacheconfig) does it work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen Tried that but FluentQueryRunnerBuilder which resides in druid-processing does not have access to ResultLevelCachingQueryRunner as it (along with most of the caching logic) resides in druid-server package.
I attempted to refactor the caching logic into druid-processing, but there are several other dependencies that may have to be moved. I could work on a follow up improvement PR to investigate and perform the refactoring and hopefully make the makeRunner method cleaner.

Copy link
Contributor

@drcrallen drcrallen Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That must be quite frustrating >.<

Can you clarify this in an issue, and put a link to the issue in a comment above this code.

That way anyone coming in later and wondering why it is like this can have a clear logic path for what needs to change to fix things up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen I've made the requested changes.

@drcrallen
Copy link
Contributor

Sweet! @a2l007 thanks for sticking this out.

@@ -426,6 +427,11 @@ public Object apply(Row input)
for (AggregatorFactory agg : aggs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@a2l007

final List<Object> retVal = Lists.newArrayListWithCapacity(1 + dims.size() + aggs.size());

a few lines above is not updated wrt getPostAggregatorSpecs. Similar problems in other classes, updated in this PR. Also I noticed that somewhere this sizing was buggy before this PR already.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants