Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add result level caching to Brokers #5028

Merged
merged 42 commits into from
Mar 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ba78816
Add result level caching to Brokers
Oct 31, 2017
7cb33cd
Minor doc changes
Oct 31, 2017
83ee76e
Simplify sequences
Nov 1, 2017
24a7595
Move etag execution
Nov 10, 2017
d2861b9
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Nov 10, 2017
130ee63
Modify cacheLimit criteria
Nov 10, 2017
efeb2b2
Fix incorrect etag computation
Nov 10, 2017
08b4feb
Fix docs
Nov 10, 2017
3639c0a
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Nov 18, 2017
e1d9175
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Nov 22, 2017
c805b92
Add separate query runner for result level caching
Nov 22, 2017
d81d81c
Update docs
Nov 22, 2017
d738fbc
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Nov 27, 2017
7e3492c
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 4, 2017
fc69327
Add post aggregated results to result level cache
Dec 4, 2017
0d409ea
Fix indents
Dec 4, 2017
04878cd
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 7, 2017
ead2dd9
Check byte size for exceeding cache limit
Dec 13, 2017
cb12828
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 13, 2017
8abf5ac
Fix indents
Dec 13, 2017
d00bb28
Fix indents
Dec 13, 2017
6bd296f
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 14, 2017
53e8056
Add flag for result caching
Dec 14, 2017
d4b823d
Remove logs
Dec 15, 2017
6586b5c
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 18, 2017
cb107f9
Make cache object generation synchronous
Dec 19, 2017
8dd436d
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 19, 2017
34d0128
Avoid saving intermediate cache results to list
Dec 19, 2017
07eb46d
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 27, 2017
9d05221
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Jan 30, 2018
5a9dc8c
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Feb 2, 2018
46da9e5
Fix changes that handle etag based response
Feb 2, 2018
bcddabf
Release bytestream after use
Feb 2, 2018
6c42e2c
Address PR comments
Feb 2, 2018
d51c21e
Discard resultcache stream after use
Feb 2, 2018
b8547bb
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Mar 9, 2018
96bbf23
Fix docs
Mar 9, 2018
28a7a0e
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Mar 19, 2018
9dec1cd
Address comments
Mar 20, 2018
b5b7992
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Mar 20, 2018
edd79c0
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Mar 22, 2018
7f2d48d
Add comment about fluent workflow issue
Mar 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ You can optionally only configure caching to be enabled on the broker by setting
|--------|---------------|-----------|-------|
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false|
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false|
|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false|
|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false|
|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`|

Expand Down
8 changes: 5 additions & 3 deletions docs/content/querying/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ layout: doc_page
---
# Query Caching

Druid supports query result caching through an LRU cache. Results are stored on a per segment basis, along with the
parameters of a given query. This allows Druid to return final results based partially on segment results in the cache and partially
on segment results from scanning historical/real-time segments.
Druid supports query result caching through an LRU cache. Results are stored as a whole or either on a per segment basis along with the
parameters of a given query. Segment level caching allows Druid to return final results based partially on segment results in the cache
and partially on segment results from scanning historical/real-time segments. Result level caching enables Druid to cache the entire
result set, so that query results can be completely retrieved from the cache for identical queries.

Segment results can be stored in a local heap cache or in an external distributed key/value store. Segment query caches
can be enabled at either the Historical and Broker level (it is not recommended to enable caching on both).
Expand All @@ -15,6 +16,7 @@ can be enabled at either the Historical and Broker level (it is not recommended
Enabling caching on the broker can yield faster results than if query caches were enabled on Historicals for small clusters. This is
the recommended setup for smaller production clusters (< 20 servers). Take note that when caching is enabled on the Broker,
results from Historicals are returned on a per segment basis, and Historicals will not be able to do any local result merging.
Result level caching is enabled only on the Broker side.

## Query caching on Historicals

Expand Down
2 changes: 2 additions & 0 deletions docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The query context is used for various query configuration parameters. The follow
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache or druid.historical.cache.populateCache to determine whether or not to save the results of this query to the query cache |
|useResultLevelCache | `false` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useResultLevelCache to determine whether or not to read from the query cache |
|populateResultLevelCache | `false` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache to determine whether or not to save the results of this query to the query cache |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
Expand Down
25 changes: 21 additions & 4 deletions processing/src/main/java/io/druid/query/CacheStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.concurrent.ExecutorService;

/**
*/
*/
@ExtensionPoint
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
Expand All @@ -37,6 +37,7 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
* @param query the query to be cached
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* called on the cached by-segment results
*
* @return true if the query is cacheable, otherwise false.
*/
boolean isCacheable(QueryType query, boolean willMergeRunners);
Expand All @@ -45,6 +46,7 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
* Computes the cache key for the given query
*
* @param query the query to compute a cache key for
*
* @return the cache key
*/
byte[] computeCacheKey(QueryType query);
Expand All @@ -58,17 +60,32 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>

/**
* Returns a function that converts from the QueryType's result type to something cacheable.
*
* <p>
* The resulting function must be thread-safe.
*
* @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching
*
* @return a thread-safe function that converts the QueryType's result type into something cacheable
*/
Function<T, CacheType> prepareForCache();
Function<T, CacheType> prepareForCache(boolean isResultLevelCache);

/**
* A function that does the inverse of the operation that the function prepareForCache returns
*
* @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching
*
* @return A function that does the inverse of the operation that the function prepareForCache returns
*/
Function<CacheType, T> pullFromCache();
Function<CacheType, T> pullFromCache(boolean isResultLevelCache);


default Function<T, CacheType> prepareForSegmentLevelCache()
{
return prepareForCache(false);
}

default Function<CacheType, T> pullFromSegmentLevelCache()
{
return pullFromCache(false);
}
}
22 changes: 22 additions & 0 deletions processing/src/main/java/io/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class QueryContexts
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
public static final boolean DEFAULT_USE_CACHE = true;
public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New feature should start off disabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be sufficient to keep it disabled via the populateResultCache parameter?
ba78816#diff-661a9dbd25633c651845a9ecab0ddbaaR44

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is confusing to have default here be true but the default in CacheConfig be false. But this just follows the convention of the prior caching settings.

Can you add a comment in CacheConfig that the defaults as stated in QueryContexts are different due to legacy reasons, and should probably be made the same at some point in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment as suggested.

public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
Expand Down Expand Up @@ -72,6 +74,26 @@ public static <T> boolean isUseCache(Query<T> query, boolean defaultValue)
return parseBoolean(query, "useCache", defaultValue);
}

public static <T> boolean isPopulateResultLevelCache(Query<T> query)
{
return isPopulateResultLevelCache(query, DEFAULT_POPULATE_RESULTLEVEL_CACHE);
}

public static <T> boolean isPopulateResultLevelCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "populateResultLevelCache", defaultValue);
}

public static <T> boolean isUseResultLevelCache(Query<T> query)
{
return isUseResultLevelCache(query, DEFAULT_USE_RESULTLEVEL_CACHE);
}

public static <T> boolean isUseResultLevelCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "useResultLevelCache", defaultValue);
}

public static <T> boolean isFinalize(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "finalize", defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.MetricManipulatorFns;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
Expand Down Expand Up @@ -408,7 +409,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Row, Object> prepareForCache()
public Function<Row, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Row, Object>()
{
Expand All @@ -426,6 +427,11 @@ public Object apply(Row input)
for (AggregatorFactory agg : aggs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@a2l007

final List<Object> retVal = Lists.newArrayListWithCapacity(1 + dims.size() + aggs.size());

a few lines above is not updated wrt getPostAggregatorSpecs. Similar problems in other classes, updated in this PR. Also I noticed that somewhere this sizing was buggy before this PR already.

retVal.add(event.get(agg.getName()));
}
if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
retVal.add(event.get(postAgg.getName()));
}
}
return retVal;
}

Expand All @@ -435,7 +441,7 @@ public Object apply(Row input)
}

@Override
public Function<Object, Row> pullFromCache()
public Function<Object, Row> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Row>()
{
Expand All @@ -460,7 +466,12 @@ public Row apply(Object input)
final AggregatorFactory factory = aggsIter.next();
event.put(factory.getName(), factory.deserialize(results.next()));
}

if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
while (postItr.hasNext() && results.hasNext()) {
event.put(postItr.next().getName(), results.next());
}
}
if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) {
throw new ISE(
"Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public TypeReference<SegmentAnalysis> getCacheObjectClazz()
}

@Override
public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache()
public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache(boolean isResultLevelCache)
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
Expand All @@ -211,7 +211,7 @@ public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
}

@Override
public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache()
public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache(boolean isResultLevelCache)
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Result<SearchResultValue>, Object> prepareForCache()
public Function<Result<SearchResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<SearchResultValue>, Object>()
{
Expand All @@ -221,7 +221,7 @@ public Object apply(Result<SearchResultValue> input)
}

@Override
public Function<Object, Result<SearchResultValue>> pullFromCache()
public Function<Object, Result<SearchResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<SearchResultValue>>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Result<SelectResultValue>, Object> prepareForCache()
public Function<Result<SelectResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<SelectResultValue>, Object>()
{
Expand Down Expand Up @@ -272,7 +272,7 @@ public Object apply(final Result<SelectResultValue> input)
}

@Override
public Function<Object, Result<SelectResultValue>> pullFromCache()
public Function<Object, Result<SelectResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<SelectResultValue>>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache()
public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<TimeBoundaryResultValue>, Object>()
{
Expand All @@ -184,7 +184,7 @@ public Object apply(Result<TimeBoundaryResultValue> input)
}

@Override
public Function<Object, Result<TimeBoundaryResultValue>> pullFromCache()
public Function<Object, Result<TimeBoundaryResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<TimeBoundaryResultValue>>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Result<TimeseriesResultValue>, Object> prepareForCache()
public Function<Result<TimeseriesResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<TimeseriesResultValue>, Object>()
{
Expand All @@ -188,14 +188,18 @@ public Object apply(final Result<TimeseriesResultValue> input)
for (AggregatorFactory agg : aggs) {
retVal.add(results.getMetric(agg.getName()));
}

if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
retVal.add(results.getMetric(postAgg.getName()));
}
}
return retVal;
}
};
}

@Override
public Function<Object, Result<TimeseriesResultValue>> pullFromCache()
public Function<Object, Result<TimeseriesResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<TimeseriesResultValue>>()
{
Expand All @@ -216,6 +220,12 @@ public Result<TimeseriesResultValue> apply(@Nullable Object input)
final AggregatorFactory factory = aggsIter.next();
retVal.put(factory.getName(), factory.deserialize(resultIter.next()));
}
if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
while (postItr.hasNext() && resultIter.hasNext()) {
retVal.put(postItr.next().getName(), resultIter.next());
}
}

return new Result<TimeseriesResultValue>(
timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ public TypeReference<Result<TopNResultValue>> getResultTypeReference()
}



@Override
public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrategy(final TopNQuery query)
{
Expand Down Expand Up @@ -341,7 +340,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Result<TopNResultValue>, Object> prepareForCache()
public Function<Result<TopNResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<TopNResultValue>, Object>()
{
Expand All @@ -361,6 +360,11 @@ public Object apply(final Result<TopNResultValue> input)
for (String aggName : aggFactoryNames) {
vals.add(result.getMetric(aggName));
}
if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
vals.add(result.getMetric(postAgg.getName()));
}
}
retVal.add(vals);
}
return retVal;
Expand All @@ -369,7 +373,7 @@ public Object apply(final Result<TopNResultValue> input)
}

@Override
public Function<Object, Result<TopNResultValue>> pullFromCache()
public Function<Object, Result<TopNResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<TopNResultValue>>()
{
Expand Down Expand Up @@ -401,7 +405,12 @@ public Result<TopNResultValue> apply(Object input)
for (PostAggregator postAgg : postAggs) {
vals.put(postAgg.getName(), postAgg.compute(vals));
}

if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
while (postItr.hasNext() && resultIter.hasNext()) {
vals.put(postItr.next().getName(), resultIter.next());
}
}
retVal.add(vals);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ public void testCacheStrategy() throws Exception
null
);

Object preparedValue = strategy.prepareForCache().apply(result);
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result);

ObjectMapper objectMapper = new DefaultObjectMapper();
SegmentAnalysis fromCacheValue = objectMapper.readValue(
objectMapper.writeValueAsBytes(preparedValue),
strategy.getCacheObjectClazz()
);

SegmentAnalysis fromCacheResult = strategy.pullFromCache().apply(fromCacheValue);
SegmentAnalysis fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);

Assert.assertEquals(result, fromCacheResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testCacheStrategy() throws Exception
new SearchResultValue(ImmutableList.of(new SearchHit("dim1", "a")))
);

Object preparedValue = strategy.prepareForCache().apply(
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(
result
);

Expand All @@ -69,7 +69,7 @@ public void testCacheStrategy() throws Exception
strategy.getCacheObjectClazz()
);

Result<SearchResultValue> fromCacheResult = strategy.pullFromCache().apply(fromCacheValue);
Result<SearchResultValue> fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);

Assert.assertEquals(result, fromCacheResult);
}
Expand Down
Loading