Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexTask improvements #3611

Merged
merged 4 commits into from
Jan 18, 2017
Merged

IndexTask improvements #3611

merged 4 commits into from
Jan 18, 2017

Conversation

dclim
Copy link
Contributor

@dclim dclim commented Oct 26, 2016

This PR introduces a number of improvements to the IndexTask (non-Hadoop batch indexer). The primary motivation of improving IndexTask is with the view that it can be used with something like #1998 for automatic merging and splitting of segments for users without Hadoop. This would replace the functionality of the MergeTask which is limited by the fact that it can only operate on NoneShardSpec-type segments. Secondary motivation is to improve the performance/stability of IndexTask for users who don't run a Hadoop cluster.

Main changes:

  • Overall performance: Old implementation iterated through entire data set once to determine intervals, once to determine shards for that interval, and once to generate index for each shard; hence if your segmentGranularity resulted in 12 time intervals and each interval had 3 shards, the entire dataset was read from 1 + 12 + 36 times. This has been reduced to either 1 time if numShards and intervals is provided, otherwise 2 times. Performance gains will vary based on number of intervals/shards, but in all cases are better than the previous implementation and typically significantly better. Performance compared to Hadoop index task running locally is typically equivalent or better. The Appenderator implementation is now being used for index generation.
  • ReplayableFirehose: Introduced a new firehose wrapper type that entirely consumes the wrapped firehose into temporary files and then serves read requests from the cached data. This has a few benefits:
    • In the case that we need to read the data more than once, it prevents us from fetching from remote resources multiple times (saves time, potentially money)
    • In the case that we need to read the data more than once, it gives us deterministic results where the wrapped firehose may not
    • In the case that the underlying firehose is unreliable, it allows us to retriably grab the entire dataset first and only begin serving data after we have a full copy (this addresses a number of issues recently reported with the S3 firehose failing in the middle of reading)
    • I did not document the ReplayableFirehose, even though it could theoretically wrap any other firehose because I'm not sure how much use it has outside of the index task. IndexTask wraps any provided firehose in a ReplayableFirehose, although this behavior can be disabled by configuration.
  • segmentGranularity intervals no longer needs to be explicitly provided but can be determined, which could make things easier for some use cases
  • Append support: Users can set the appendToExisting flag which will generate segments that are additional partitions added to the latest version of the existing segments, effectively 'appending data' into the datasource. The set of segments to be appended to must be an extendable-type (i.e. they can't be NoneShardSpecs).
  • Reworked IndexTuningConfig to have similarly-named parameters to other index task types: If people agree with this change, it is backwards-incompatible and would require users to update their ingestion specs.

This PR depends on #3483 being merged and I don't expect tests to pass until that happens.

@dclim dclim added this to the 0.9.3 milestone Oct 26, 2016
@fjy
Copy link
Contributor

fjy commented Oct 27, 2016

@dclim any benchmarks on how much faster this is?

@@ -187,7 +187,7 @@ This spec is used to generated segments with uniform intervals.
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does hadoop index task need intervals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked again, and it looks like it's almost coded not to need intervals, but I think there might be one set of configs that will fail without intervals. I'll test it out and fix it (if easy) or revert this (if non-trivial).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed Hadoop tasks to work without intervals

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I split this code out from this review and reverted this to indicate Hadoop still needs intervals

|type|The task type, this should always be "index".|none|yes|
|firehose|Specify a [Firehose](../ingestion/firehose.html) here.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs (which can be forced by setting 'forceExtendableShardSpecs' in the tuning config).|false|no|
|skipFirehoseCaching|By default the IndexTask will fully read the supplied firehose to disk before processing the data. This prevents the task from doing multiple remote fetches and enforces determinism if more than one pass through the data is required. It also allows the task to retry fetching the data if the firehose throws an exception during reading. This requires sufficient disk space for the temporary cache.|false|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not set this to true always?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nvm, for remote files it is better to cache

shardSpecForPublishing = shardSpec;
}
finally {
if (Closeable.class.isAssignableFrom(firehoseFactory.getClass())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just close the firehose instead of the factory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to move this code to where the firehose is created and call close there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A design goal here was to keep the existing Firehose and FirehoseFactory interfaces without modification. So the first time a user calls ReplayableFirehoseFactory.connect() it creates a new firehose which reads the delegate firehose into cached files. Once the ReplayableFirehose is read through, it's expected the user will close the firehose which will close the filestream but leaves the cached files on disk (obviously, that's the whole point).

The ReplayableFirehoseFactory maintains a reference to the firehose it created so when connect() is called again, it returns the original firehose "rewound", and doesn't read from the delegate again.

So the purpose of closing the factory is really just to delete those temporary files to free up disk space early. The files are all marked with deleteOnExit() so forgetting to close the factory isn't fatal, but closing it is nicer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just firehoseFactory instanceof Closeable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's better, thanks

@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary,
@JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can have some code for backwards compat to support the old names for a release or two, and make sure to comment that they are deprecated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also changed the semantics of how targetPartitionSize and numShards work together because I found it a bit awkward.. previously you had to set targetPartitionSize to -1 if you wanted to use numShards; I made it so if you just leave out targetPartitionSize and set numShards it'll work, but I have a bounds check on targetPartitionSize to not be negative. I guess I can special case -1 for backwards compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backwards compatibility is important, since this is not a major release. Even in majors it's nice to be compatible unless there's a really strong reason not to. So let's accept either null or -1 as meaning "don't do this".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

private ReplayableFirehose firehose;

@JsonCreator
public ReplayableFirehoseFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is firehose documented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as I mentioned in the initial comment, I wasn't sure whether or not to document it because I don't know how much use it'll have outside of IndexTask which uses it by default. Ah, I'll just write docs for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it makes sense to leave it internal use decorator sort of things undocumented. Docs would just be confusing, since we don't expect anyone to actually use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm is "no docs = for internal use" is an official policy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov no, but if the docs provide no value, then there's no point to add them

Copy link
Contributor

@gianm gianm Nov 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have an official policy that things that are documented will be stable within a 0.X release line (see versioning.md). Things that aren't documented might change without warning in any release, possibly even disappearing.

I think if we don't intend for something to be an external API, it's best to leave it undocumented, since otherwise we create an API stability burden we don't really need.

Copy link
Member

@leventov leventov Nov 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm cannot find such statements in https://github.com/druid-io/druid/blob/d981a2aa02aedfdf18e0bed5848e704bd9597563/docs/content/development/versioning.md.

This policy seems strange to me, because it's inhibiting a good practice of documenting code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov there have been countless experiences where providing too much documentation just served to confuse people

dimensions = Ordering.natural().immutableSortedCopy(dimensionScratch);

if (counter == 0) {
log.warn("Firehose contains no events!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we print the name of the file as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't really print any useful identifier here since it's abstracted by the Firehose interface. It may not even be a file.

@@ -0,0 +1,34 @@
package io.druid.segment.realtime.plumber;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

header

@dclim
Copy link
Contributor Author

dclim commented Oct 27, 2016

@fjy regarding benchmarks, the performance improvement is highly dependent on the number of intervals/partitions generated. Here's some data from some of my tests:

Ingesting 6M rows of TPCH in 10 shards:

  • Old index task: 11 minutes
  • New index task: 7 minutes

Ingesting 6M rows of TPCH in 84 shards:

  • Old index task: 53 minutes
  • New index task: 13 minutes

Ingesting 30M rows of TPCH in ~50 shards:

  • Old index task: 101 minutes
  • New index task: 41 minutes

@ryryguy
Copy link

ryryguy commented Oct 27, 2016

Hi, S3 timeouts are currently giving us major headaches trying to run backfills under 0.9.1.1. We basically cannot scale up our indexing capacity effectively because of this issue.

What would be the prospect of applying this PR onto 0.9.1.1 if we wanted to try it out in a private build? Does it depend on other post 0.9.1.1 changes? Perhaps just teasing out the ReplayableFirehose stuff since that looks like the main part that would help us?

@fjy
Copy link
Contributor

fjy commented Oct 27, 2016

@ryryguy the index task is not recommended for batch ingestion of files greater than 1GB. You should be using hadoop based batch indexing. See https://imply.io/docs/latest/ingestion-batch

@ryryguy
Copy link

ryryguy commented Oct 27, 2016

@fjy our source files are basically right around 1GB. Some are over (up to 1.6 GB), some are under (down to 500 MB). I'm not sure if the S3 timeouts are correlated to ingestion file size - maybe somewhat, but when we experience periods where the failure rate approaches 100%, that is definitely including files that are well under 1GB.

We briefly took a shot at setting up hadoop based batch indexing and couldn't get it to work. (We have basically zero experience with hadoop on our team...) Since it also reads files from S3, we weren't sure it would really address the problem. But if you think it might help, we can take another shot at it.

@gianm
Copy link
Contributor

gianm commented Oct 27, 2016

@ryryguy The hadoop index task does have automatic retries, at least.

@dclim
Copy link
Contributor Author

dclim commented Oct 27, 2016

@ryryguy I believe it should be possible to patch this onto 0.9.1.1, keeping the following in mind:

That's all I can think of right now but I may have missed something. Use at your own risk :)

as @fjy mentioned, the recommendation for most use cases is using a Hadoop cluster for batch ingestion since it will scale and process files in parallel whereas the index task will not. The Hadoop framework retries failed tasks so that should help with transient S3 issues.

@fjy fjy closed this Oct 28, 2016
@fjy fjy reopened this Oct 28, 2016
try (CountingOutputStream cos = new CountingOutputStream(new FileOutputStream(tmpFile));
JsonGenerator generator = factory.createGenerator(cos)) {

while (delegateFirehose.hasMore() && cos.getCount() < getMaxTempFileSize()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the hasMore check here required?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the maxtempfilesize is incorrectly configured, such as a negative value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually that should be fine

@fjy
Copy link
Contributor

fjy commented Oct 28, 2016

👍

@dclim
Copy link
Contributor Author

dclim commented Oct 28, 2016

Unit tests won't pass until #3483 is in

@fjy fjy closed this Oct 31, 2016
@fjy fjy reopened this Oct 31, 2016
}

return id;
return id != null ? id : String.format("index_%s_%s", makeDataSource(ingestionSchema), new DateTime().toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need toString()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

.isPresent();

final FirehoseFactory delegateFirehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().isSkipFirehoseCaching()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make if-else instead of such big ternary expression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

shardSpecForPublishing = shardSpec;
}
finally {
if (Closeable.class.isAssignableFrom(firehoseFactory.getClass())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just firehoseFactory instanceof Closeable?

private ReplayableFirehose firehose;

@JsonCreator
public ReplayableFirehoseFactory(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm is "no docs = for internal use" is an official policy?


File tmpDir = Files.createTempDir();
if (DELETE_TEMP_FILES) {
tmpDir.deleteOnExit();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firehose is Closeable, maybe delete the tmpDir in close()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting the directory in ReplayableFirehose.close() breaks the functionality of the firehose since it needs to retain the cache files between uses. We could delete the directory in ReplayableFirehoseFactory.close(), but I don't think it really makes a difference deleting the directory immediately vs when the process exits.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dclim ok, thanks. Maybe you could add a comment to the code explaining this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good


log.info(
"Finished reading from firehose in [%,dms], [%,d] events parsed, [%,d] bytes written, [%,d] events unparseable",
System.currentTimeMillis() - ingestStartMillis,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe report this time as metric somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be interesting, but probably not necessary in this PR

tmpDir.deleteOnExit();
}

long ingestStartMillis = System.currentTimeMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use nanoTime()? It's much more precise, that may count at least in some cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

jsonParser = factory.createParser(files.get(fileIndex));
it = jsonParser.readValuesAs(Row.class);
}
catch (IOException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If catching just IOException, Throwables.propagate() is synonym to throw new RuntimeException(e), which should be preferred IMO because it is less library-ish

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point, thanks

@Override
public void close() throws IOException
{
CloseQuietly.close(jsonParser);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If close() throws IOException, why consume possible IOException in jsonParser?

private static final Logger log = new Logger(NoopSegmentHandoffNotifierFactory.class);

@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataSource is not used in the returned SegmentHandoffNotifier. Not sure if it is as intended or not. If it is not, maybe cache a SegmentHandoffNotifier in a static field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll cache it, thanks!

@gianm gianm assigned fjy and gianm Nov 22, 2016
@linbojin
Copy link
Contributor

Can this pr be used for auto-merge segments created by kafka indexing service?

@dclim
Copy link
Contributor Author

dclim commented Nov 23, 2016

@linbojin it can be used to merge segments from the Kafka indexing service by using the ingest segment firehose, but the tasks still need to be submitted manually or by an external scheduler. Hopefully in the near future the 'auto' part will be implemented, possibly by generalizing #1998 to support IndexTask.

@fjy fjy assigned himanshug and unassigned gianm Nov 28, 2016
public void close() throws IOException
{
if (firehose != null) {
firehose.deleteTempFiles();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can deleteTempFIles() be called as part of close() implementation itself and not called separately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rework this as per the comments below


long startTime = System.nanoTime();
do {
deleteTempFiles();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this ever needed ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvmd, i see that it is for the retry case.

try (Firehose delegateFirehose = delegateFactory.connect(parser)) {
while (delegateFirehose.hasMore()) {
File tmpFile = File.createTempFile("replayable-", null, tmpDir);
tmpFile.deleteOnExit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that tmpDir.deleteOnExit() has been called before, is this one still necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe tmpDir.deleteOnExit() will fail if there are still files in the directory; for reference: http://docs.oracle.com/javase/7/docs/api/java/io/File.html#delete()

totalBytes += cos.getCount();
}
}
break; // we succeeded so break out of retry loop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be simpler to read with a boolean isDone variable instead of while(true) in the outer loop, but this is just preference so up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can do that

throw new IllegalStateException("No more data!");
}

jsonParser = factory.createParser(files.get(fileIndex));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will things be ok, if the file returned from files.get(fileIndex) is empty. it is possible (a corner case though) for the last file to not have any data in it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, things won't work correctly if the file returned is empty (the iterator will throw an exception when called), but I don't think it's possible for this situation to happen (unless the firehose we're wrapping lies to us when we call delegateFirehose.hasMore()). See the logic in the ReplayableFirehose constructor for how these files are generated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dclim what about the case when delegate.hasMore() said true but last set of events were unparseable , so nothing went in the last file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug will rework to handle this case, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be better now

}
final String version;
final DataSchema dataSchema;
if (!determineIntervals) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reversing the condition makes this statement easier to read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

}
// determine intervals containing data and prime HLL collectors
final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = Maps.newHashMap();
int unparsed = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like more of "thrownAway" rather than unparsed really

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

shardSpecForPublishing = shardSpec;
}
finally {
if (firehoseFactory instanceof Closeable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FirehoseFactory is generally not closable and nowhere else in the code would it be closed. If ReplayableFirehoseFactory needs to be handled specially then only IndexTask can use it and something should be done in the code so that ReplayableFirehoseFactory does not accidentally gets used elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, the purpose of closing the factory is to delete the temporary cache files. Technically, this doesn't really need to be done, since we would expect the deleteOnExit() mechanism to handle cleanup when the process terminates. I added it since I thought it would be nice to clean up as soon as possible, but having a Closeable FirehoseFactory (understandably) seems to be adding a lot of uncertainty. I'll rework it to just clean up the temp files when the process terminates using deleteOnExit(), which should happen at pretty much the same time in indexing tasks.

{
private final JsonFactory factory = new DefaultObjectMapper(
new SmileFactory().enable(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES)
).getFactory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be able to work with sketch objects defined in extensions? for example if someone is using IndexTask to do re-indexing with IngestSegmentFirehose and row has sketch objects, will they be serialized correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question, will test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, no this doesn't work with custom serializers; I have to use the Guice managed one through @Smile @JacksonInject; nice catch, thank you.

@dclim dclim closed this Jan 3, 2017
@dclim dclim reopened this Jan 3, 2017
@himanshug
Copy link
Contributor

i'm assuming this has been tested.

looks good besides #3611 (review)

I wish ReplayableFirehoseFactory could be configured to take a max possible disk space instead of using unlimited disk to make disk utilization a bit predictable from usability point of view. but, that is probably for some other day some other PR.

@himanshug himanshug closed this Jan 4, 2017
@himanshug himanshug reopened this Jan 4, 2017
}

try {
jsonParser.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UT is failing, you probably need a null check here.

@himanshug
Copy link
Contributor

👍 overall after #3611 (review) fixed

@dclim
Copy link
Contributor Author

dclim commented Jan 17, 2017

@himanshug oops, changed it from CloseQuietly(jsonParser) and forgot to re-run UT. Thanks for the review!

@himanshug
Copy link
Contributor

👍

1 similar comment
@fjy
Copy link
Contributor

fjy commented Jan 18, 2017

👍

@fjy fjy merged commit ff52581 into apache:master Jan 18, 2017
@dclim dclim deleted the sans-hadoops branch January 18, 2017 23:18
dgolitsyn pushed a commit to metamx/druid that referenced this pull request Feb 14, 2017
* index task improvements

* code review changes

* add null check
@hamlet-lee
Copy link
Contributor

@dclim is the bug #4147 related this PR?

@dclim
Copy link
Contributor Author

dclim commented Apr 5, 2017

@hamlet-lee no it's not, but I see you were able to figure it out

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants