Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behavior of index_parallel with appendToExisting=false and no bucketIntervals in GranularitySpec is surprising #6989

Closed
glasser opened this issue Feb 2, 2019 · 9 comments
Labels

Comments

@glasser
Copy link
Contributor

glasser commented Feb 2, 2019

We're experimenting with native batch ingestion on our 0.13-incubating cluster for the first time (with a custom firehose reading from files saved to GCS by Secor, with a custom InputRowParser).

There was a period of a week where the data source had no data. We ran batch ingestion (index_parallel) over one particular hour (5am-6am on December 16th) and it successfully ingested that hour — a segment showed up in the coordinator, it could be queried, etc. (Our segment granularity is HOUR.)

Then we ran it again on the entire 24 hours of December 16th. It ran 24 subtasks (our firehose divides up by hour) and ingested the full day, yay!

Except when we look in the coordinator, it now lists 2 segments with identical sizes for the 5am hour that we first tested with. Also both of them are listed with the same version which was from the first batch ingestion, not the version that the other 23 segments have from the second batch ingestion.

We did not explicitly specify appendToExisting in our ioConfig but I believe the default is false and looking at the task payload it is expanded to false.

Are we doing something wrong if our goal is to replace existing segments? Isn't that what appendToExisting: false should do?

The bad hour in the coordinator:
image

The good hour:
image

@glasser
Copy link
Contributor Author

glasser commented Feb 5, 2019

OK, I think I figured this out. Will test, but then will transform this issue into a docs suggestion and file a PR to fix.

@glasser glasser changed the title Native batch ingestion didn't replace existing segment Behavior of index_parallel with appendToExisting=false and no bucketIntervals in GranularitySpec is surprising Feb 6, 2019
@glasser
Copy link
Contributor Author

glasser commented Feb 6, 2019

What I was missing here is that native batch parallel ingestion effectively acts as if appendToExisting is true unless you specify explicit intervals in the GranularitySpec.

This seems to be different from both Hadoop batch ingestion and the Local Index Task (including index_parallel with a non-splittable FirehoseFactory) — all of these (if I understand correctly) will run an additional phase to calculate the intervals if they are not provided.

This confused me. I'd like to help fix it!

I think we should consider the current behavior a bug and the top-level parallel index task should error if all of the following are true):

  • Runningindex_parallel
  • FirehoseFactory.isSplittable() (or possibly leave this one out)
  • appendToExisting == false
  • granularitySpec does not specify intervals

While this would be a backwards-incompatible change in 0.14, native batch ingestion is still a very new feature and this behavior is very surprising — and there's a trivial workaround of setting appendToExisting to true if you like the current behavior.

If that's not the right change, we could fix the docs instead. I'd update the doc of appendToExisting in native_tasks.md to mention that it is effectively true if intervals aren't specified, and the docs of intervals in ingestion_spec should mention that native parallel tasks care about them more.

(I suppose one could also make parallel indexing do two scans in this case, but in my case I certainly would have been happier being asked to add one line to my spec rather than have my experience take twice as long, and it's more complex.)

I'm happy to do implement either the new error or the docs update based on what is best.
Thoughts (@jihoonson ?)

@jihoonson
Copy link
Contributor

@glasser thank you for finding this! I agree with you that the behavior of indexParallelTask is supposed to be same with (or at least similar to) indexTask or hadoopIndexTask. So, I think this is a bug. indexParallelTask is expected to overwrite existing segments unless appendToExisting is explicitly set to true.

I think it's still possible to avoid another scan even if intervals are not given. That is, we can find intervals and generate segments at the same time. The algorithm would be:

  1. Finds a bucketed interval from an input row. This can be done by interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
  2. Checks the task has a valid lock for that interval. If it doesn't have a lock yet, it should requests a lock. If it fails to get a lock or the lock has already revoked, the task fails.
  3. Create a segmentId with the version of the lock.

So, this would be mostly about allocating segmentIds and getting task locks. I think it would be better to modify ParallelIndexSupervisorTask.allocateNewSegment() rather than modifying SegmentAllocateAction because SegmentAllocateAction is designed for appending and already complex enough.

In summary, we may want to change this block to call taskClient.allocateSegment() if explicitIntervals = false. Also ParallelIndexSupervisorTask.allocateNewSegment() needs to be modified to implement the above algorithm.

What do you think?

@glasser
Copy link
Contributor Author

glasser commented Feb 6, 2019

I think advising about changing locking semantics is beyond my understanding of Druid (which expanded immensely today by researching this issue). What you describe seems reasonable, but I don't feel like I can confidently say that changing from a static set of locks determined at the beginning of the task to a dynamic set of locks is safe.

That said, I could try to implement your suggestion even if I can't evaluate its correctness!

@jihoonson
Copy link
Contributor

Thank you! Looking forward to your PR.

I think it's safe to get locks dynamically. The indexTask already gets locks one by one per interval though it happens in a tight loop.
Also, if there's something wrong in task locks, publishing segments must fail which makes tasks failed. Check out SegmentTransactionalInsertAction which is called in SinglePhaseParallelIndexTaskRunner.publish().

@glasser
Copy link
Contributor Author

glasser commented Feb 6, 2019

Hi @jihoonson. A couple random questions while I work on this:

  1. Where should tests of this new logic end up? And how do I actually run parts of the Druid test suite? I'm not very experienced with Maven — I know how to run mvn install to run all of the tests for all of Druid, but not anything more specific. (I use IntelliJ if that helps.)

  2. If it's OK to dynamically add locks one by one as the task runs, why do the local and hadoop indexing tasks do an initial scan to determine all the intervals at once? Do they need to do that scan for some other reason anyway?

  3. General batch ingestion/segment replacement question: if you're using batch ingestion (of any kind: Hadoop, native, local) with granularitySpec interval specified and appendToExisting false, to re-ingest to an interval that already contains data, but there is an time chunk of the data source's segment granularity that has no row in your batch ingestion run, what will happen to the data in that time chunk? It seems to me that nothing will happen because I haven't seen anything that creates empty segments for a time chunk with no data, and so there's no segment to overshadow the old segment. Is that expected? Is there a good way to say "replace this interval of time with data from this batch job, including dropping segments from time chunks if there's nothing there"? We're considering using batch ingestion with the ingestSegment firehose and filtering in order to retain only specific rarer kinds of data past a certain distance in the past, and it's possible to imagine that that data might be missing for an entire hour here and there.

@jihoonson
Copy link
Contributor

@glasser

1. Where should tests of this new logic end up? And how do I actually run parts of the Druid test suite? I'm not very experienced with Maven — I know how to run mvn install to run all of the tests for all of Druid, but not anything more specific. (I use IntelliJ if that helps.)

I would recommend to use Intellij or any other IDE you prefer. If you want to do in the terminal, you can run mvn test -Dtest=TestClass and mvn verify -P integration-tests -Dit.test=TestClass for unit tests and integration tests, respectively.

2. If it's OK to dynamically add locks one by one as the task runs, why do the local and hadoop indexing tasks do an initial scan to determine all the intervals at once? Do they need to do that scan for some other reason anyway?

I think it's because of #4550. Those classes were written before #4550, and at that time, there was no concept of revoking locks. As a result, if two or more tasks get locks one by one dynamically, they might get stuck in the middle of ingestion. Moreover, it can cause a deadlock if they block each other. However, now, the higher priority tasks can preempt lower priority tasks.

3. General batch ingestion/segment replacement question: if you're using batch ingestion (of any kind: Hadoop, native, local) with granularitySpec interval specified and appendToExisting false, to re-ingest to an interval that already contains data, but there is an time chunk of the data source's segment granularity that has no row in your batch ingestion run, what will happen to the data in that time chunk? It seems to me that nothing will happen because I haven't seen anything that creates empty segments for a time chunk with no data, and so there's no segment to overshadow the old segment. Is that expected? Is there a good way to say "replace this interval of time with data from this batch job, including dropping segments from time chunks if there's nothing there"? We're considering using batch ingestion with the ingestSegment firehose and filtering in order to retain only specific rarer kinds of data past a certain distance in the past, and it's possible to imagine that that data might be missing for an entire hour here and there.

Good question. I don't think we're currently supporting that kind of replacement. But maybe it's worth to support.

@glasser
Copy link
Contributor Author

glasser commented Feb 9, 2019

I started! Another question about locking. In allocateNewSegment there's this comment:

List locks whenever allocating a new segment because locks might be revoked and no longer valid.

But if I am reading the implementation of TaskLockbox and LockListAction correctly, revoked locks will show up in the response to a LockListAction. Should that stream have a filter(taskLock -> !taskLock.isRevoked()) in it? Otherwise you might as well just remember the TaskLocks that were set up in isReady.

@jihoonson
Copy link
Contributor

@glasser sorry, just saw your last question. I left some comments in the PR.

jihoonson pushed a commit that referenced this issue Feb 20, 2019
…7046)

* index_parallel: support !appendToExisting with no explicit intervals

This enables ParallelIndexSupervisorTask to dynamically request locks at runtime
if it is run without explicit intervals in the granularity spec and with
appendToExisting set to false.  Previously, it behaved as if appendToExisting
was set to true, which was undocumented and inconsistent with IndexTask and
Hadoop indexing.

Also, when ParallelIndexSupervisorTask allocates segments in the explicit
interval case, fail if its locks on the interval have been revoked.

Also make a few other additions/clarifications to native ingestion docs.

Fixes #6989.

* Review feedback.

PR description on GitHub updated to match.

* Make native batch ingestion partitions start at 0

* Fix to previous commit

* Unit test. Verified to fail without the other commits on this branch.

* Another round of review

* Slightly scarier warning
jihoonson pushed a commit to jihoonson/druid that referenced this issue Feb 20, 2019
…pache#7046)

* index_parallel: support !appendToExisting with no explicit intervals

This enables ParallelIndexSupervisorTask to dynamically request locks at runtime
if it is run without explicit intervals in the granularity spec and with
appendToExisting set to false.  Previously, it behaved as if appendToExisting
was set to true, which was undocumented and inconsistent with IndexTask and
Hadoop indexing.

Also, when ParallelIndexSupervisorTask allocates segments in the explicit
interval case, fail if its locks on the interval have been revoked.

Also make a few other additions/clarifications to native ingestion docs.

Fixes apache#6989.

* Review feedback.

PR description on GitHub updated to match.

* Make native batch ingestion partitions start at 0

* Fix to previous commit

* Unit test. Verified to fail without the other commits on this branch.

* Another round of review

* Slightly scarier warning
jihoonson added a commit that referenced this issue Feb 23, 2019
…7046) (#7113)

* index_parallel: support !appendToExisting with no explicit intervals

This enables ParallelIndexSupervisorTask to dynamically request locks at runtime
if it is run without explicit intervals in the granularity spec and with
appendToExisting set to false.  Previously, it behaved as if appendToExisting
was set to true, which was undocumented and inconsistent with IndexTask and
Hadoop indexing.

Also, when ParallelIndexSupervisorTask allocates segments in the explicit
interval case, fail if its locks on the interval have been revoked.

Also make a few other additions/clarifications to native ingestion docs.

Fixes #6989.

* Review feedback.

PR description on GitHub updated to match.

* Make native batch ingestion partitions start at 0

* Fix to previous commit

* Unit test. Verified to fail without the other commits on this branch.

* Another round of review

* Slightly scarier warning
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants