-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
index_parallel: support !appendToExisting with no explicit intervals #7046
index_parallel: support !appendToExisting with no explicit intervals #7046
Conversation
Work in progress. I need to figure out how to effectively run the integration test suite and add to ITParallelIndexTest a second ingestion that replaces segments from the first ingestion. (It's unclear to me how I can use mvn to run the integration-tests while skipping unit tests (some of which fail due to requiring S3 credentials etc, and which are just slow), and while I got |
For reference: I ended up running the integration test by running:
The PATH update put a Homebrew openssl on my path; the run_cluster.sh scripts didn't properly set up certs with the OSX default openssl. (They also don't use |
Ok, this is ready for review. There are a few FIXME comments where I have questions. This implements a suggestion from @jihoonson. |
(Added/tweaked some docs while I was at it.) |
@glasser thanks! I'll check soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@glasser thank you for the nice work! I left some comments. In addition to them, would you please add a unit test too? You have added a nice integration test, but the unit test would be cool because it's easier to debug. I think you can add some in ParallelIndexSupervisorTaskTest
.
@@ -28,6 +28,9 @@ Druid currently has two types of native batch indexing tasks, `index_parallel` w | |||
in parallel on multiple MiddleManager nodes, and `index` which will run a single indexing task locally on a single | |||
MiddleManager. | |||
|
|||
To run either kind of native batch indexing task, write an ingestion spec as specified below. Then POST it to the | |||
`/druid/indexer/v1/task` endpoint on your Overlord machine, or use the `post-index-task` script included with Druid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a link to http://druid.io/docs/latest/operations/api-reference.html#tasks for details of the API.
// List locks whenever allocating a new segment because locks might be revoked and no longer valid. | ||
final Map<Interval, String> versions = toolbox | ||
.getTaskActionClient() | ||
.submit(new LockListAction()) | ||
.stream() | ||
// FIXME note that the next line is new in this commit --- it's not relevant for fixing #6989, | ||
// it just seems like a bug fix to make the code match its comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for trying to fix this! I think it's not a bug, but it looks useful because it would make the task failed earlier if it doesn't a valid lock.
The intention of returning revoked locks in LockListAction is that the caller should be notified about revoked locks, so that they can handle it properly. I think we should fail here immediately if any lock is revoked because there's no way to get a new lock once the lock is revoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I think the current code doesn't match is comment is that if we didn't care about checking for revoked locks, we wouldn't need to run LockListAction again – we could just remember the versions we got when we took the locks out at the beginning of the task.
throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec); | ||
version = findVersion(versions, interval); | ||
if (version == null) { | ||
// FIXME I'm preserving the existing error message here, but it seems like the message should |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we check revoked locks above, I think it's fine to keep the original message.
toolbox.getTaskActionClient().submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)), | ||
"Cannot acquire a lock for interval[%s]", interval | ||
); | ||
if (!interval.equals(lock.getInterval())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to skip this check. The lock is created using the given interval, and so this must not happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, maybe we want to check this is the first partition of this interval. Probably worth to check Counters.incrementAndGetInt(partitionNumCountersPerInterval, interval)
returns 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, partitionNum
should actually be Counters.getAndIncrementInt()
. I'm fine with fixing it in this PR if you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean by the third comment: you mean that the method's name should be changed from Counters.incrementAndGetInt to Counters.getAndIncrementInt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry. I mean, we should add a new method which gets first and then increment the counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, to make it start at 0 rather than 1? OK, done. Note that this was the only caller of Counters.incrementAndGetInt
: should I delete it, or leave it around for extension compatibility reasons?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no one is using it. You should be fine to remove it.
new NumberedShardSpec(partitionNum, 0) | ||
); | ||
} | ||
|
||
private static String findVersion(Map<Interval, String> versions, Interval interval) | ||
private static @Nullable String findVersion(Map<Interval, String> versions, Interval interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we usually add the annotation above the method.
); | ||
} | ||
catch (Exception e) { | ||
throw Throwables.propagate(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is deprecated. please use throw new RuntimeException(e)
instead.
I'm not quite sure how to write a test in ParallelIndexSupervisorTaskTest to fully exercise this. testWithoutInterval does exercise the new just-in-time locking code, but it doesn't explicitly validate that new versions of segments would be created. Is that adequate, or should I figure out a way to have a test that ingests some data twice and see different versions the second time? I can't see how to get access to the metadata of published segments. Something in createTaskToolbox? There's a lot of nulls there :) |
Yeah, nulls.. We need to tidy it up later. I think you can list published segments from the metadata store. You may need to expose Side note: |
OK, I fixed the counter to start at 0 (and removed the old helper), and added a unit test which fails without the other changes. (I've been doing updates as separate commits but updating the PR description to match the full change, so the PR desc should work fine as a commit message for a squash of this PR. Let me know if you have a different preference for how to respond to code review.) |
Thanks for updating! I think commit messages are fine. I'll take another look. In the meantime, would you please check this CI failure?
|
I fixed that one already; the current failures are some odd Maven issues like
which I don't think are my fault? Unfortunately as a non project owner I think the only way I can rerun these CI tasks is to push a new commit, and I can't even kill other subjobs in the old job. |
Ah yeah #6581 seems to have broken master. |
CI now passing. |
This does now work, but I'm now realizing I would never actually want to use it in my use case, since if your input data accidentally contains one stray row from outside the interval you're trying to replace, it'll delete a ton of data. (And we just found a bug that could lead to rows derived from our Kafka backups being outside the interval you'd think they'd be in.) Maybe I should add some scarier warnings to the docs? Part of me does feel like it would be easier to reason about if non-appending batch ingestion always required you to specify a target interval and always replaced all segments inside that interval on success (like I was thinking about in part 3 of #6989 (comment)), but I'm sure there are use cases where that isn't desired. The current semantics of "non-appending batch ingestion replaces any data that happens to be within segmentGranularity of specific rows produced by the firehose" seems harder for me to reason about. |
@glasser hmm good point. I think it would be fine by adding a scary warning to doc for now. Probably worth to discuss how we can improve the native task behavior to be more intuitive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@glasser thank you for the quick update!
final List<DataSegment> newSegments = | ||
getStorageCoordinator().getUsedSegmentsForInterval("dataSource", interval); | ||
Assert.assertEquals(1, newSegments.size()); | ||
Assert.assertNotEquals(oldSegments.get(0).getVersion(), newSegments.get(0).getVersion()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe better to check the newSegment has a newer version than the old one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
INDEX_DATASOURCE, | ||
REINDEX_TASK, | ||
REINDEX_QUERIES_RESOURCE, | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By any chance, is it possible that this makes the test flaky? It will block while getSegmentVersions()
returns a single version. What happens if overshadowed segments are removed from CoordinatorServerView
between retries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I'll do a more direct check that any new versions have shown up.
This enables ParallelIndexSupervisorTask to dynamically request locks at runtime if it is run without explicit intervals in the granularity spec and with appendToExisting set to false. Previously, it behaved as if appendToExisting was set to true, which was undocumented and inconsistent with IndexTask and Hadoop indexing. Also, when ParallelIndexSupervisorTask allocates segments in the explicit interval case, fail if its locks on the interval have been revoked. Also make a few other additions/clarifications to native ingestion docs. Fixes #6989.
PR description on GitHub updated to match.
OK, test concerns fixed and passing CI, and docs made a little scarier. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @glasser thanks!
…pache#7046) * index_parallel: support !appendToExisting with no explicit intervals This enables ParallelIndexSupervisorTask to dynamically request locks at runtime if it is run without explicit intervals in the granularity spec and with appendToExisting set to false. Previously, it behaved as if appendToExisting was set to true, which was undocumented and inconsistent with IndexTask and Hadoop indexing. Also, when ParallelIndexSupervisorTask allocates segments in the explicit interval case, fail if its locks on the interval have been revoked. Also make a few other additions/clarifications to native ingestion docs. Fixes apache#6989. * Review feedback. PR description on GitHub updated to match. * Make native batch ingestion partitions start at 0 * Fix to previous commit * Unit test. Verified to fail without the other commits on this branch. * Another round of review * Slightly scarier warning
…7046) (#7113) * index_parallel: support !appendToExisting with no explicit intervals This enables ParallelIndexSupervisorTask to dynamically request locks at runtime if it is run without explicit intervals in the granularity spec and with appendToExisting set to false. Previously, it behaved as if appendToExisting was set to true, which was undocumented and inconsistent with IndexTask and Hadoop indexing. Also, when ParallelIndexSupervisorTask allocates segments in the explicit interval case, fail if its locks on the interval have been revoked. Also make a few other additions/clarifications to native ingestion docs. Fixes #6989. * Review feedback. PR description on GitHub updated to match. * Make native batch ingestion partitions start at 0 * Fix to previous commit * Unit test. Verified to fail without the other commits on this branch. * Another round of review * Slightly scarier warning
This enables ParallelIndexSupervisorTask to dynamically request locks at runtime
if it is run without explicit intervals in the granularity spec and with
appendToExisting set to false. Previously, it behaved as if appendToExisting
was set to true, which was undocumented and inconsistent with IndexTask and
Hadoop indexing.
Also, when ParallelIndexSupervisorTask allocates segments in the explicit
interval case, fail fast when allocating new segments any of its locks on the
interval have been revoked.
Also, make segment partition numbers allocated by native batch ingestion without
appendToExisting start at 0 rather than 1.
Fixes #6989.