-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IndexTask improvements #3611
IndexTask improvements #3611
Conversation
@dclim any benchmarks on how much faster this is? |
@@ -187,7 +187,7 @@ This spec is used to generated segments with uniform intervals. | |||
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') | | |||
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') | | |||
| rollup | boolean | rollup or not | no (default == true) | | |||
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time | | |||
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does hadoop index task need intervals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked again, and it looks like it's almost coded not to need intervals, but I think there might be one set of configs that will fail without intervals. I'll test it out and fix it (if easy) or revert this (if non-trivial).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed Hadoop tasks to work without intervals
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I split this code out from this review and reverted this to indicate Hadoop still needs intervals
|type|The task type, this should always be "index".|none|yes| | ||
|firehose|Specify a [Firehose](../ingestion/firehose.html) here.|none|yes| | ||
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs (which can be forced by setting 'forceExtendableShardSpecs' in the tuning config).|false|no| | ||
|skipFirehoseCaching|By default the IndexTask will fully read the supplied firehose to disk before processing the data. This prevents the task from doing multiple remote fetches and enforces determinism if more than one pass through the data is required. It also allows the task to retry fetching the data if the firehose throws an exception during reading. This requires sufficient disk space for the temporary cache.|false|no| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not set this to true always?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah nvm, for remote files it is better to cache
shardSpecForPublishing = shardSpec; | ||
} | ||
finally { | ||
if (Closeable.class.isAssignableFrom(firehoseFactory.getClass())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just close the firehose instead of the factory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to move this code to where the firehose is created and call close there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A design goal here was to keep the existing Firehose and FirehoseFactory interfaces without modification. So the first time a user calls ReplayableFirehoseFactory.connect()
it creates a new firehose which reads the delegate firehose into cached files. Once the ReplayableFirehose is read through, it's expected the user will close the firehose which will close the filestream but leaves the cached files on disk (obviously, that's the whole point).
The ReplayableFirehoseFactory maintains a reference to the firehose it created so when connect()
is called again, it returns the original firehose "rewound", and doesn't read from the delegate again.
So the purpose of closing the factory is really just to delete those temporary files to free up disk space early. The files are all marked with deleteOnExit()
so forgetting to close the factory isn't fatal, but closing it is nicer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just firehoseFactory instanceof Closeable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's better, thanks
@JsonProperty("targetPartitionSize") int targetPartitionSize, | ||
@JsonProperty("rowFlushBoundary") int rowFlushBoundary, | ||
@JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, | ||
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can have some code for backwards compat to support the old names for a release or two, and make sure to comment that they are deprecated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also changed the semantics of how targetPartitionSize
and numShards
work together because I found it a bit awkward.. previously you had to set targetPartitionSize
to -1
if you wanted to use numShards
; I made it so if you just leave out targetPartitionSize
and set numShards
it'll work, but I have a bounds check on targetPartitionSize
to not be negative. I guess I can special case -1 for backwards compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backwards compatibility is important, since this is not a major release. Even in majors it's nice to be compatible unless there's a really strong reason not to. So let's accept either null or -1 as meaning "don't do this".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
private ReplayableFirehose firehose; | ||
|
||
@JsonCreator | ||
public ReplayableFirehoseFactory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is firehose documented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, as I mentioned in the initial comment, I wasn't sure whether or not to document it because I don't know how much use it'll have outside of IndexTask which uses it by default. Ah, I'll just write docs for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it makes sense to leave it internal use decorator sort of things undocumented. Docs would just be confusing, since we don't expect anyone to actually use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm is "no docs = for internal use" is an official policy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leventov no, but if the docs provide no value, then there's no point to add them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have an official policy that things that are documented will be stable within a 0.X release line (see versioning.md). Things that aren't documented might change without warning in any release, possibly even disappearing.
I think if we don't intend for something to be an external API, it's best to leave it undocumented, since otherwise we create an API stability burden we don't really need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm cannot find such statements in https://github.com/druid-io/druid/blob/d981a2aa02aedfdf18e0bed5848e704bd9597563/docs/content/development/versioning.md.
This policy seems strange to me, because it's inhibiting a good practice of documenting code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leventov there have been countless experiences where providing too much documentation just served to confuse people
dimensions = Ordering.natural().immutableSortedCopy(dimensionScratch); | ||
|
||
if (counter == 0) { | ||
log.warn("Firehose contains no events!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we print the name of the file as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't really print any useful identifier here since it's abstracted by the Firehose interface. It may not even be a file.
@@ -0,0 +1,34 @@ | |||
package io.druid.segment.realtime.plumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
header
@fjy regarding benchmarks, the performance improvement is highly dependent on the number of intervals/partitions generated. Here's some data from some of my tests: Ingesting 6M rows of TPCH in 10 shards:
Ingesting 6M rows of TPCH in 84 shards:
Ingesting 30M rows of TPCH in ~50 shards:
|
Hi, S3 timeouts are currently giving us major headaches trying to run backfills under 0.9.1.1. We basically cannot scale up our indexing capacity effectively because of this issue. What would be the prospect of applying this PR onto 0.9.1.1 if we wanted to try it out in a private build? Does it depend on other post 0.9.1.1 changes? Perhaps just teasing out the ReplayableFirehose stuff since that looks like the main part that would help us? |
@ryryguy the index task is not recommended for batch ingestion of files greater than 1GB. You should be using hadoop based batch indexing. See https://imply.io/docs/latest/ingestion-batch |
@fjy our source files are basically right around 1GB. Some are over (up to 1.6 GB), some are under (down to 500 MB). I'm not sure if the S3 timeouts are correlated to ingestion file size - maybe somewhat, but when we experience periods where the failure rate approaches 100%, that is definitely including files that are well under 1GB. We briefly took a shot at setting up hadoop based batch indexing and couldn't get it to work. (We have basically zero experience with hadoop on our team...) Since it also reads files from S3, we weren't sure it would really address the problem. But if you think it might help, we can take another shot at it. |
@ryryguy The hadoop index task does have automatic retries, at least. |
@ryryguy I believe it should be possible to patch this onto 0.9.1.1, keeping the following in mind:
That's all I can think of right now but I may have missed something. Use at your own risk :) as @fjy mentioned, the recommendation for most use cases is using a Hadoop cluster for batch ingestion since it will scale and process files in parallel whereas the index task will not. The Hadoop framework retries failed tasks so that should help with transient S3 issues. |
try (CountingOutputStream cos = new CountingOutputStream(new FileOutputStream(tmpFile)); | ||
JsonGenerator generator = factory.createGenerator(cos)) { | ||
|
||
while (delegateFirehose.hasMore() && cos.getCount() < getMaxTempFileSize()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the hasMore check here required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the maxtempfilesize is incorrectly configured, such as a negative value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually that should be fine
👍 |
Unit tests won't pass until #3483 is in |
} | ||
|
||
return id; | ||
return id != null ? id : String.format("index_%s_%s", makeDataSource(ingestionSchema), new DateTime().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need toString()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks
.isPresent(); | ||
|
||
final FirehoseFactory delegateFirehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); | ||
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().isSkipFirehoseCaching() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make if-else instead of such big ternary expression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
shardSpecForPublishing = shardSpec; | ||
} | ||
finally { | ||
if (Closeable.class.isAssignableFrom(firehoseFactory.getClass())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just firehoseFactory instanceof Closeable
?
private ReplayableFirehose firehose; | ||
|
||
@JsonCreator | ||
public ReplayableFirehoseFactory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm is "no docs = for internal use" is an official policy?
|
||
File tmpDir = Files.createTempDir(); | ||
if (DELETE_TEMP_FILES) { | ||
tmpDir.deleteOnExit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firehose
is Closeable
, maybe delete the tmpDir in close()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleting the directory in ReplayableFirehose.close()
breaks the functionality of the firehose since it needs to retain the cache files between uses. We could delete the directory in ReplayableFirehoseFactory.close()
, but I don't think it really makes a difference deleting the directory immediately vs when the process exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dclim ok, thanks. Maybe you could add a comment to the code explaining this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
|
||
log.info( | ||
"Finished reading from firehose in [%,dms], [%,d] events parsed, [%,d] bytes written, [%,d] events unparseable", | ||
System.currentTimeMillis() - ingestStartMillis, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe report this time as metric somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be interesting, but probably not necessary in this PR
tmpDir.deleteOnExit(); | ||
} | ||
|
||
long ingestStartMillis = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use nanoTime()
? It's much more precise, that may count at least in some cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
jsonParser = factory.createParser(files.get(fileIndex)); | ||
it = jsonParser.readValuesAs(Row.class); | ||
} | ||
catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If catching just IOException
, Throwables.propagate()
is synonym to throw new RuntimeException(e)
, which should be preferred IMO because it is less library-ish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair point, thanks
@Override | ||
public void close() throws IOException | ||
{ | ||
CloseQuietly.close(jsonParser); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If close()
throws IOException, why consume possible IOException in jsonParser
?
private static final Logger log = new Logger(NoopSegmentHandoffNotifierFactory.class); | ||
|
||
@Override | ||
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dataSource
is not used in the returned SegmentHandoffNotifier
. Not sure if it is as intended or not. If it is not, maybe cache a SegmentHandoffNotifier in a static field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll cache it, thanks!
Can this pr be used for auto-merge segments created by kafka indexing service? |
@linbojin it can be used to merge segments from the Kafka indexing service by using the ingest segment firehose, but the tasks still need to be submitted manually or by an external scheduler. Hopefully in the near future the 'auto' part will be implemented, possibly by generalizing #1998 to support IndexTask. |
public void close() throws IOException | ||
{ | ||
if (firehose != null) { | ||
firehose.deleteTempFiles(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can deleteTempFIles() be called as part of close() implementation itself and not called separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll rework this as per the comments below
|
||
long startTime = System.nanoTime(); | ||
do { | ||
deleteTempFiles(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this ever needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvmd, i see that it is for the retry case.
try (Firehose delegateFirehose = delegateFactory.connect(parser)) { | ||
while (delegateFirehose.hasMore()) { | ||
File tmpFile = File.createTempFile("replayable-", null, tmpDir); | ||
tmpFile.deleteOnExit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that tmpDir.deleteOnExit() has been called before, is this one still necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe tmpDir.deleteOnExit() will fail if there are still files in the directory; for reference: http://docs.oracle.com/javase/7/docs/api/java/io/File.html#delete()
totalBytes += cos.getCount(); | ||
} | ||
} | ||
break; // we succeeded so break out of retry loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be simpler to read with a boolean isDone variable instead of while(true) in the outer loop, but this is just preference so up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I can do that
throw new IllegalStateException("No more data!"); | ||
} | ||
|
||
jsonParser = factory.createParser(files.get(fileIndex)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will things be ok, if the file returned from files.get(fileIndex) is empty. it is possible (a corner case though) for the last file to not have any data in it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, things won't work correctly if the file returned is empty (the iterator will throw an exception when called), but I don't think it's possible for this situation to happen (unless the firehose we're wrapping lies to us when we call delegateFirehose.hasMore()). See the logic in the ReplayableFirehose constructor for how these files are generated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dclim what about the case when delegate.hasMore() said true but last set of events were unparseable , so nothing went in the last file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@himanshug will rework to handle this case, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be better now
} | ||
final String version; | ||
final DataSchema dataSchema; | ||
if (!determineIntervals) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reversing the condition makes this statement easier to read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks
} | ||
// determine intervals containing data and prime HLL collectors | ||
final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = Maps.newHashMap(); | ||
int unparsed = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like more of "thrownAway" rather than unparsed really
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
shardSpecForPublishing = shardSpec; | ||
} | ||
finally { | ||
if (firehoseFactory instanceof Closeable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FirehoseFactory is generally not closable and nowhere else in the code would it be closed. If ReplayableFirehoseFactory needs to be handled specially then only IndexTask can use it and something should be done in the code so that ReplayableFirehoseFactory does not accidentally gets used elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, the purpose of closing the factory is to delete the temporary cache files. Technically, this doesn't really need to be done, since we would expect the deleteOnExit()
mechanism to handle cleanup when the process terminates. I added it since I thought it would be nice to clean up as soon as possible, but having a Closeable FirehoseFactory (understandably) seems to be adding a lot of uncertainty. I'll rework it to just clean up the temp files when the process terminates using deleteOnExit()
, which should happen at pretty much the same time in indexing tasks.
{ | ||
private final JsonFactory factory = new DefaultObjectMapper( | ||
new SmileFactory().enable(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES) | ||
).getFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this be able to work with sketch objects defined in extensions? for example if someone is using IndexTask to do re-indexing with IngestSegmentFirehose and row has sketch objects, will they be serialized correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question, will test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, no this doesn't work with custom serializers; I have to use the Guice managed one through @Smile @JacksonInject
; nice catch, thank you.
i'm assuming this has been tested. looks good besides #3611 (review) I wish ReplayableFirehoseFactory could be configured to take a max possible disk space instead of using unlimited disk to make disk utilization a bit predictable from usability point of view. but, that is probably for some other day some other PR. |
} | ||
|
||
try { | ||
jsonParser.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UT is failing, you probably need a null check here.
👍 overall after #3611 (review) fixed |
@himanshug oops, changed it from CloseQuietly(jsonParser) and forgot to re-run UT. Thanks for the review! |
👍 |
1 similar comment
👍 |
* index task improvements * code review changes * add null check
@hamlet-lee no it's not, but I see you were able to figure it out |
This PR introduces a number of improvements to the IndexTask (non-Hadoop batch indexer). The primary motivation of improving IndexTask is with the view that it can be used with something like #1998 for automatic merging and splitting of segments for users without Hadoop. This would replace the functionality of the MergeTask which is limited by the fact that it can only operate on NoneShardSpec-type segments. Secondary motivation is to improve the performance/stability of IndexTask for users who don't run a Hadoop cluster.
Main changes:
numShards
andintervals
is provided, otherwise 2 times. Performance gains will vary based on number of intervals/shards, but in all cases are better than the previous implementation and typically significantly better. Performance compared to Hadoop index task running locally is typically equivalent or better. The Appenderator implementation is now being used for index generation.intervals
no longer needs to be explicitly provided but can be determined, which could make things easier for some use casesappendToExisting
flag which will generate segments that are additional partitions added to the latest version of the existing segments, effectively 'appending data' into the datasource. The set of segments to be appended to must be an extendable-type (i.e. they can't be NoneShardSpecs).This PR depends on #3483 being merged and I don't expect tests to pass until that happens.