Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add result level caching to Brokers #5028

Merged
merged 42 commits into from
Mar 24, 2018
Merged
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ba78816
Add result level caching to Brokers
Oct 31, 2017
7cb33cd
Minor doc changes
Oct 31, 2017
83ee76e
Simplify sequences
Nov 1, 2017
24a7595
Move etag execution
Nov 10, 2017
d2861b9
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Nov 10, 2017
130ee63
Modify cacheLimit criteria
Nov 10, 2017
efeb2b2
Fix incorrect etag computation
Nov 10, 2017
08b4feb
Fix docs
Nov 10, 2017
3639c0a
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Nov 18, 2017
e1d9175
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Nov 22, 2017
c805b92
Add separate query runner for result level caching
Nov 22, 2017
d81d81c
Update docs
Nov 22, 2017
d738fbc
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Nov 27, 2017
7e3492c
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 4, 2017
fc69327
Add post aggregated results to result level cache
Dec 4, 2017
0d409ea
Fix indents
Dec 4, 2017
04878cd
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 7, 2017
ead2dd9
Check byte size for exceeding cache limit
Dec 13, 2017
cb12828
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 13, 2017
8abf5ac
Fix indents
Dec 13, 2017
d00bb28
Fix indents
Dec 13, 2017
6bd296f
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 14, 2017
53e8056
Add flag for result caching
Dec 14, 2017
d4b823d
Remove logs
Dec 15, 2017
6586b5c
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 18, 2017
cb107f9
Make cache object generation synchronous
Dec 19, 2017
8dd436d
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 19, 2017
34d0128
Avoid saving intermediate cache results to list
Dec 19, 2017
07eb46d
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Dec 27, 2017
9d05221
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Jan 30, 2018
5a9dc8c
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Feb 2, 2018
46da9e5
Fix changes that handle etag based response
Feb 2, 2018
bcddabf
Release bytestream after use
Feb 2, 2018
6c42e2c
Address PR comments
Feb 2, 2018
d51c21e
Discard resultcache stream after use
Feb 2, 2018
b8547bb
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Mar 9, 2018
96bbf23
Fix docs
Mar 9, 2018
28a7a0e
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Mar 19, 2018
9dec1cd
Address comments
Mar 20, 2018
b5b7992
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Mar 20, 2018
edd79c0
Merge branch 'master' of https://github.com/druid-io/druid into resul…
Mar 22, 2018
7f2d48d
Add comment about fluent workflow issue
Mar 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Avoid saving intermediate cache results to list
  • Loading branch information
Atul Mohan committed Dec 19, 2017
commit 34d01284274881f2441b5845a8b667d260f356fd
120 changes: 65 additions & 55 deletions server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java
Original file line number Diff line number Diff line change
@@ -41,9 +41,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
@@ -108,18 +106,21 @@ public Sequence<T> run(QueryPlus queryPlus, Map responseContext)
cacheKeyStr,
newResultSetId
);

if (resultLevelCachePopulator == null) {
return resultFromClient;
}
final Function<T, Object> cacheFn = strategy.prepareForCache(true);

return Sequences.wrap(Sequences.map(
resultFromClient,
new Function<T, T>()
{
@Override
public T apply(T input)
{
cacheResultEntry(resultLevelCachePopulator, input);
if (resultLevelCachePopulator.isShouldPopulate()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are doing this check for every row in the result set whereas it should only be done once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nmnd, i see its checking whether we've crossed the size limit

resultLevelCachePopulator.cacheResultEntry(resultLevelCachePopulator, input, cacheFn);
}
return input;
}
}
@@ -128,14 +129,22 @@ public T apply(T input)
@Override
public void after(boolean isDone, Throwable thrown) throws Exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thrown is ignored, what if there was an exception in the stage before?

{
Preconditions.checkNotNull(resultLevelCachePopulator, "ResultLevelCachePopulator cannot be null during cache population");
Preconditions.checkNotNull(
resultLevelCachePopulator,
"ResultLevelCachePopulator cannot be null during cache population"
);
if (thrown != null) {
log.error("Error while preparing for result level caching for query %s ", query.getId());
} else {
log.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) why not put the error as a parameter to error(?

"Error while preparing for result level caching for query %s with error %s ",
query.getId(),
thrown.getMessage()
);
} else if (resultLevelCachePopulator.isShouldPopulate()) {
// The resultset identifier and its length is cached along with the resultset
resultLevelCachePopulator.populateResults(newResultSetId);
resultLevelCachePopulator.populateResults();
log.debug("Cache population complete for query %s", query.getId());
}
resultLevelCachePopulator.cacheObjectStream.close();
}
});
}
@@ -147,17 +156,6 @@ public void after(boolean isDone, Throwable thrown) throws Exception
}
}

private T cacheResultEntry(
ResultLevelCachePopulator resultLevelCachePopulator,
T resultEntry
)
{
final Function<T, Object> cacheFn = strategy.prepareForCache(true);
resultLevelCachePopulator.cacheObjects
.add(cacheFn.apply(resultEntry));
return resultEntry;
}

private byte[] fetchResultsFromResultLevelCache(
final String queryCacheKey
)
@@ -216,12 +214,25 @@ private ResultLevelCachePopulator createResultLevelCachePopulator(
)
{
if (resultSetId != null && populateResultCache) {
return new ResultLevelCachePopulator(
ResultLevelCachePopulator resultLevelCachePopulator = new ResultLevelCachePopulator(
cache,
objectMapper,
ResultLevelCacheUtil.computeResultLevelCacheKey(cacheKeyStr),
cacheConfig
cacheConfig,
true
);
try {
// Save the resultSetId and its length
resultLevelCachePopulator.cacheObjectStream.write(ByteBuffer.allocate(Integer.BYTES)
.putInt(resultSetId.length())
.array());
resultLevelCachePopulator.cacheObjectStream.write(StringUtils.toUtf8(resultSetId));
}
catch (IOException ioe) {
log.error("Failed to write cached values for query %s", query.getId());
return null;
}
return resultLevelCachePopulator;
} else {
return null;
}
@@ -232,60 +243,59 @@ public class ResultLevelCachePopulator
private final Cache cache;
private final ObjectMapper mapper;
private final Cache.NamedKey key;
private final List<Object> cacheObjects = new ArrayList<>();
private final CacheConfig cacheConfig;
private final ByteArrayOutputStream cacheObjectStream = new ByteArrayOutputStream();

public boolean isShouldPopulate()
{
return shouldPopulate;
}

private boolean shouldPopulate;

private ResultLevelCachePopulator(
Cache cache,
ObjectMapper mapper,
Cache.NamedKey key,
CacheConfig cacheConfig
CacheConfig cacheConfig,
boolean shouldPopulate
)
{
this.cache = cache;
this.mapper = mapper;
this.key = key;
this.cacheConfig = cacheConfig;
this.shouldPopulate = shouldPopulate;
}

public void populateResults(String resultSetIdentifier)
private void cacheResultEntry(
ResultLevelCachePopulator resultLevelCachePopulator,
T resultEntry,
Function<T, Object> cacheFn
)
{
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
// Save the resultSetId and its length
bytes.write(ByteBuffer.allocate(Integer.BYTES).putInt(resultSetIdentifier.length()).array());
bytes.write(StringUtils.toUtf8(resultSetIdentifier));
byte[] resultBytes = fetchResultBytes(bytes, cacheConfig.getCacheBulkMergeLimit());
if (resultBytes != null) {
ResultLevelCacheUtil.populate(
cache,
key,
resultBytes
);
}
// Help out GC by making sure all references are gone
cacheObjects.clear();

int cacheLimit = cacheConfig.getResultLevelCacheLimit();
if (cacheLimit > 0 && resultLevelCachePopulator.cacheObjectStream.size() > cacheLimit) {
shouldPopulate = false;
return;
}
catch (IOException ioe) {
log.error("Failed to write cached values for query %s", query.getId());
try (JsonGenerator gen = mapper.getFactory().createGenerator(resultLevelCachePopulator.cacheObjectStream)) {
gen.writeObject(cacheFn.apply(resultEntry));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if last element in sequence took you over the size limit, shouldPopulate would still stay true resulting in it getting stored even if it crossed the size limit.

also, as soon as we cross the size limit... we should discard the data stored in cacheObjectStream because we know it is not going to be used and let it be GC'd .

}
catch (IOException ex) {
log.error("Failed to retrieve entry to be cached. Result Level caching will not be performed!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably log error here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aka, include the exception in the error parameters for the logger call

shouldPopulate = false;
}
}

private byte[] fetchResultBytes(ByteArrayOutputStream resultStream, int cacheLimit)
public void populateResults()
{
for (Object cacheObj : cacheObjects) {
try (JsonGenerator gen = mapper.getFactory().createGenerator(resultStream)) {
gen.writeObject(cacheObj);
if (cacheLimit > 0 && resultStream.size() > cacheLimit) {
return null;
}
}
catch (IOException ex) {
log.error("Failed to retrieve entry to be cached. Result Level caching will not be performed!");
return null;
}
}
return resultStream.toByteArray();
ResultLevelCacheUtil.populate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be cleaner to do the check whether we crossed limit at this point. I know its done externally but may be clearer to do that here.

cache,
key,
cacheObjectStream.toByteArray()
);
}
}
}