-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add result level caching to Brokers #5028
Changes from 35 commits
ba78816
7cb33cd
83ee76e
24a7595
d2861b9
130ee63
efeb2b2
08b4feb
3639c0a
e1d9175
c805b92
d81d81c
d738fbc
7e3492c
fc69327
0d409ea
04878cd
ead2dd9
cb12828
8abf5ac
d00bb28
6bd296f
53e8056
d4b823d
6586b5c
cb107f9
8dd436d
34d0128
07eb46d
9d05221
5a9dc8c
46da9e5
bcddabf
6c42e2c
d51c21e
b8547bb
96bbf23
28a7a0e
9dec1cd
b5b7992
edd79c0
7f2d48d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,14 +61,16 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>> | |
* | ||
* The resulting function must be thread-safe. | ||
* | ||
* @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching | ||
* @return a thread-safe function that converts the QueryType's result type into something cacheable | ||
*/ | ||
Function<T, CacheType> prepareForCache(); | ||
Function<T, CacheType> prepareForCache(boolean isResultLevelCache); | ||
|
||
/** | ||
* A function that does the inverse of the operation that the function prepareForCache returns | ||
* | ||
* @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching | ||
* @return A function that does the inverse of the operation that the function prepareForCache returns | ||
*/ | ||
Function<CacheType, T> pullFromCache(); | ||
Function<CacheType, T> pullFromCache(boolean isResultLevelCache); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since a lot of calls default to |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,8 @@ public class QueryContexts | |
public static final boolean DEFAULT_BY_SEGMENT = false; | ||
public static final boolean DEFAULT_POPULATE_CACHE = true; | ||
public static final boolean DEFAULT_USE_CACHE = true; | ||
public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New feature should start off disabled There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be sufficient to keep it disabled via the populateResultCache parameter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is confusing to have default here be Can you add a comment in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment as suggested. |
||
public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true; | ||
public static final int DEFAULT_PRIORITY = 0; | ||
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; | ||
public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes | ||
|
@@ -72,6 +74,26 @@ public static <T> boolean isUseCache(Query<T> query, boolean defaultValue) | |
return parseBoolean(query, "useCache", defaultValue); | ||
} | ||
|
||
public static <T> boolean isPopulateResultLevelCache(Query<T> query) | ||
{ | ||
return isPopulateResultLevelCache(query, DEFAULT_POPULATE_RESULTLEVEL_CACHE); | ||
} | ||
|
||
public static <T> boolean isPopulateResultLevelCache(Query<T> query, boolean defaultValue) | ||
{ | ||
return parseBoolean(query, "populateResultLevelCache", defaultValue); | ||
} | ||
|
||
public static <T> boolean isUseResultLevelCache(Query<T> query) | ||
{ | ||
return isUseResultLevelCache(query, DEFAULT_USE_RESULTLEVEL_CACHE); | ||
} | ||
|
||
public static <T> boolean isUseResultLevelCache(Query<T> query, boolean defaultValue) | ||
{ | ||
return parseBoolean(query, "useResultLevelCache", defaultValue); | ||
} | ||
|
||
public static <T> boolean isFinalize(Query<T> query, boolean defaultValue) | ||
{ | ||
return parseBoolean(query, "finalize", defaultValue); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,7 @@ | |
import io.druid.query.aggregation.AggregatorFactory; | ||
import io.druid.query.aggregation.MetricManipulationFn; | ||
import io.druid.query.aggregation.MetricManipulatorFns; | ||
import io.druid.query.aggregation.PostAggregator; | ||
import io.druid.query.cache.CacheKeyBuilder; | ||
import io.druid.query.dimension.DefaultDimensionSpec; | ||
import io.druid.query.dimension.DimensionSpec; | ||
|
@@ -408,7 +409,7 @@ public TypeReference<Object> getCacheObjectClazz() | |
} | ||
|
||
@Override | ||
public Function<Row, Object> prepareForCache() | ||
public Function<Row, Object> prepareForCache(boolean isResultLevelCache) | ||
{ | ||
return new Function<Row, Object>() | ||
{ | ||
|
@@ -426,6 +427,11 @@ public Object apply(Row input) | |
for (AggregatorFactory agg : aggs) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
a few lines above is not updated wrt getPostAggregatorSpecs. Similar problems in other classes, updated in this PR. Also I noticed that somewhere this sizing was buggy before this PR already. |
||
retVal.add(event.get(agg.getName())); | ||
} | ||
if (isResultLevelCache) { | ||
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { | ||
retVal.add(event.get(postAgg.getName())); | ||
} | ||
} | ||
return retVal; | ||
} | ||
|
||
|
@@ -435,7 +441,7 @@ public Object apply(Row input) | |
} | ||
|
||
@Override | ||
public Function<Object, Row> pullFromCache() | ||
public Function<Object, Row> pullFromCache(boolean isResultLevelCache) | ||
{ | ||
return new Function<Object, Row>() | ||
{ | ||
|
@@ -460,7 +466,12 @@ public Row apply(Object input) | |
final AggregatorFactory factory = aggsIter.next(); | ||
event.put(factory.getName(), factory.deserialize(results.next())); | ||
} | ||
|
||
if (isResultLevelCache) { | ||
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator(); | ||
while (postItr.hasNext() && results.hasNext()) { | ||
event.put(postItr.next().getName(), results.next()); | ||
} | ||
} | ||
if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) { | ||
throw new ISE( | ||
"Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as per below, I thought this defaulted to
false
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the docs.