Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

index_parallel: support !appendToExisting with no explicit intervals #7046

Merged
merged 7 commits into from
Feb 20, 2019
Merged

index_parallel: support !appendToExisting with no explicit intervals #7046

merged 7 commits into from
Feb 20, 2019

Conversation

glasser
Copy link
Contributor

@glasser glasser commented Feb 9, 2019

This enables ParallelIndexSupervisorTask to dynamically request locks at runtime
if it is run without explicit intervals in the granularity spec and with
appendToExisting set to false. Previously, it behaved as if appendToExisting
was set to true, which was undocumented and inconsistent with IndexTask and
Hadoop indexing.

Also, when ParallelIndexSupervisorTask allocates segments in the explicit
interval case, fail fast when allocating new segments any of its locks on the
interval have been revoked.

Also, make segment partition numbers allocated by native batch ingestion without
appendToExisting start at 0 rather than 1.

Fixes #6989.

@glasser
Copy link
Contributor Author

glasser commented Feb 9, 2019

Work in progress.

I need to figure out how to effectively run the integration test suite and add to ITParallelIndexTest a second ingestion that replaces segments from the first ingestion. (It's unclear to me how I can use mvn to run the integration-tests while skipping unit tests (some of which fail due to requiring S3 credentials etc, and which are just slow), and while I got ./run_cluster.sh to work, I can't figure out how to get IntelliJ to connect to it.)

@glasser
Copy link
Contributor Author

glasser commented Feb 10, 2019

For reference: I ended up running the integration test by running:

export PATH="/usr/local/opt/openssl/bin:$PATH"
export DOCKER_IP=127.0.0.1
mvn install -ff -DskipTests -B -e
(cd integration-tests && mvn verify -P integration-tests -Dit.test=ITParallelIndexTest)

The PATH update put a Homebrew openssl on my path; the run_cluster.sh scripts didn't properly set up certs with the OSX default openssl. (They also don't use set -e enough so openssl failures didn't always cause the script to fail.) I only had to rerun mvn install when I changed actual server code, not integration test code. I had to increase the RAM given to my Mac Docker to 6GB (in the Docker GUI settings).

@glasser
Copy link
Contributor Author

glasser commented Feb 10, 2019

Ok, this is ready for review. There are a few FIXME comments where I have questions. This implements a suggestion from @jihoonson.

@glasser
Copy link
Contributor Author

glasser commented Feb 10, 2019

(Added/tweaked some docs while I was at it.)

@jihoonson
Copy link
Contributor

@glasser thanks! I'll check soon.

@jihoonson jihoonson added the Bug label Feb 12, 2019
@jihoonson jihoonson self-assigned this Feb 12, 2019
Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glasser thank you for the nice work! I left some comments. In addition to them, would you please add a unit test too? You have added a nice integration test, but the unit test would be cool because it's easier to debug. I think you can add some in ParallelIndexSupervisorTaskTest.

@@ -28,6 +28,9 @@ Druid currently has two types of native batch indexing tasks, `index_parallel` w
in parallel on multiple MiddleManager nodes, and `index` which will run a single indexing task locally on a single
MiddleManager.

To run either kind of native batch indexing task, write an ingestion spec as specified below. Then POST it to the
`/druid/indexer/v1/task` endpoint on your Overlord machine, or use the `post-index-task` script included with Druid.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a link to http://druid.io/docs/latest/operations/api-reference.html#tasks for details of the API.

// List locks whenever allocating a new segment because locks might be revoked and no longer valid.
final Map<Interval, String> versions = toolbox
.getTaskActionClient()
.submit(new LockListAction())
.stream()
// FIXME note that the next line is new in this commit --- it's not relevant for fixing #6989,
// it just seems like a bug fix to make the code match its comment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying to fix this! I think it's not a bug, but it looks useful because it would make the task failed earlier if it doesn't a valid lock.

The intention of returning revoked locks in LockListAction is that the caller should be notified about revoked locks, so that they can handle it properly. I think we should fail here immediately if any lock is revoked because there's no way to get a new lock once the lock is revoked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I think the current code doesn't match is comment is that if we didn't care about checking for revoked locks, we wouldn't need to run LockListAction again – we could just remember the versions we got when we took the locks out at the beginning of the task.

throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec);
version = findVersion(versions, interval);
if (version == null) {
// FIXME I'm preserving the existing error message here, but it seems like the message should
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we check revoked locks above, I think it's fine to keep the original message.

toolbox.getTaskActionClient().submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)),
"Cannot acquire a lock for interval[%s]", interval
);
if (!interval.equals(lock.getInterval())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to skip this check. The lock is created using the given interval, and so this must not happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, maybe we want to check this is the first partition of this interval. Probably worth to check Counters.incrementAndGetInt(partitionNumCountersPerInterval, interval) returns 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, partitionNum should actually be Counters.getAndIncrementInt(). I'm fine with fixing it in this PR if you want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by the third comment: you mean that the method's name should be changed from Counters.incrementAndGetInt to Counters.getAndIncrementInt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry. I mean, we should add a new method which gets first and then increment the counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, to make it start at 0 rather than 1? OK, done. Note that this was the only caller of Counters.incrementAndGetInt: should I delete it, or leave it around for extension compatibility reasons?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no one is using it. You should be fine to remove it.

new NumberedShardSpec(partitionNum, 0)
);
}

private static String findVersion(Map<Interval, String> versions, Interval interval)
private static @Nullable String findVersion(Map<Interval, String> versions, Interval interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we usually add the annotation above the method.

);
}
catch (Exception e) {
throw Throwables.propagate(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is deprecated. please use throw new RuntimeException(e) instead.

@glasser
Copy link
Contributor Author

glasser commented Feb 13, 2019

I'm not quite sure how to write a test in ParallelIndexSupervisorTaskTest to fully exercise this. testWithoutInterval does exercise the new just-in-time locking code, but it doesn't explicitly validate that new versions of segments would be created. Is that adequate, or should I figure out a way to have a test that ingests some data twice and see different versions the second time? I can't see how to get access to the metadata of published segments. Something in createTaskToolbox? There's a lot of nulls there :)

@jihoonson
Copy link
Contributor

Yeah, nulls.. We need to tidy it up later. I think you can list published segments from the metadata store. You may need to expose storageCoordinator in IngestionTestBase to call getUsedSegmentsForIntervals().

Side note: IngestionTestBase currently has two metadata stores of HeapMemoryTaskStorage and IndexerSQLMetadataStorageCoordinator which are based on heap and derby, respectively. This is lame and we should fix it later too.

@glasser
Copy link
Contributor Author

glasser commented Feb 13, 2019

OK, I fixed the counter to start at 0 (and removed the old helper), and added a unit test which fails without the other changes.

(I've been doing updates as separate commits but updating the PR description to match the full change, so the PR desc should work fine as a commit message for a squash of this PR. Let me know if you have a different preference for how to respond to code review.)

@jihoonson
Copy link
Contributor

Thanks for updating! I think commit messages are fine. I'll take another look. In the meantime, would you please check this CI failure?

[ERROR] /home/travis/build/apache/incubator-druid/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java:34:8: Unused import - org.apache.druid.java.util.common.logger.Logger. [UnusedImports]

@glasser
Copy link
Contributor Author

glasser commented Feb 13, 2019

I fixed that one already; the current failures are some odd Maven issues like

[FATAL] Non-resolvable parent POM for org.apache.druid.extensions.contrib:druid-momentsketch:[unknown-version]: Could not find artifact org.apache.druid:druid:pom:0.14.0-incubating-SNAPSHOT in sonatype-snapshots (https://oss.sonatype.org/content/repositories/snapshots/) and 'parent.relativePath' points at wrong local POM @ line 24, column 13

which I don't think are my fault? Unfortunately as a non project owner I think the only way I can rerun these CI tasks is to push a new commit, and I can't even kill other subjobs in the old job.

@glasser
Copy link
Contributor Author

glasser commented Feb 13, 2019

Ah yeah #6581 seems to have broken master.

@glasser
Copy link
Contributor Author

glasser commented Feb 14, 2019

CI now passing.

@glasser
Copy link
Contributor Author

glasser commented Feb 15, 2019

This does now work, but I'm now realizing I would never actually want to use it in my use case, since if your input data accidentally contains one stray row from outside the interval you're trying to replace, it'll delete a ton of data. (And we just found a bug that could lead to rows derived from our Kafka backups being outside the interval you'd think they'd be in.) Maybe I should add some scarier warnings to the docs?

Part of me does feel like it would be easier to reason about if non-appending batch ingestion always required you to specify a target interval and always replaced all segments inside that interval on success (like I was thinking about in part 3 of #6989 (comment)), but I'm sure there are use cases where that isn't desired. The current semantics of "non-appending batch ingestion replaces any data that happens to be within segmentGranularity of specific rows produced by the firehose" seems harder for me to reason about.

@jihoonson
Copy link
Contributor

@glasser hmm good point. I think it would be fine by adding a scary warning to doc for now. Probably worth to discuss how we can improve the native task behavior to be more intuitive.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glasser thank you for the quick update!

final List<DataSegment> newSegments =
getStorageCoordinator().getUsedSegmentsForInterval("dataSource", interval);
Assert.assertEquals(1, newSegments.size());
Assert.assertNotEquals(oldSegments.get(0).getVersion(), newSegments.get(0).getVersion());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe better to check the newSegment has a newer version than the old one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

INDEX_DATASOURCE,
REINDEX_TASK,
REINDEX_QUERIES_RESOURCE,
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By any chance, is it possible that this makes the test flaky? It will block while getSegmentVersions() returns a single version. What happens if overshadowed segments are removed from CoordinatorServerView between retries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I'll do a more direct check that any new versions have shown up.

This enables ParallelIndexSupervisorTask to dynamically request locks at runtime
if it is run without explicit intervals in the granularity spec and with
appendToExisting set to false.  Previously, it behaved as if appendToExisting
was set to true, which was undocumented and inconsistent with IndexTask and
Hadoop indexing.

Also, when ParallelIndexSupervisorTask allocates segments in the explicit
interval case, fail if its locks on the interval have been revoked.

Also make a few other additions/clarifications to native ingestion docs.

Fixes #6989.
PR description on GitHub updated to match.
@glasser
Copy link
Contributor Author

glasser commented Feb 16, 2019

OK, test concerns fixed and passing CI, and docs made a little scarier.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @glasser thanks!

@jihoonson jihoonson merged commit a81b1b8 into apache:master Feb 20, 2019
@jihoonson jihoonson added this to the 0.14.0 milestone Feb 20, 2019
jihoonson pushed a commit to jihoonson/druid that referenced this pull request Feb 20, 2019
…pache#7046)

* index_parallel: support !appendToExisting with no explicit intervals

This enables ParallelIndexSupervisorTask to dynamically request locks at runtime
if it is run without explicit intervals in the granularity spec and with
appendToExisting set to false.  Previously, it behaved as if appendToExisting
was set to true, which was undocumented and inconsistent with IndexTask and
Hadoop indexing.

Also, when ParallelIndexSupervisorTask allocates segments in the explicit
interval case, fail if its locks on the interval have been revoked.

Also make a few other additions/clarifications to native ingestion docs.

Fixes apache#6989.

* Review feedback.

PR description on GitHub updated to match.

* Make native batch ingestion partitions start at 0

* Fix to previous commit

* Unit test. Verified to fail without the other commits on this branch.

* Another round of review

* Slightly scarier warning
jihoonson added a commit that referenced this pull request Feb 23, 2019
…7046) (#7113)

* index_parallel: support !appendToExisting with no explicit intervals

This enables ParallelIndexSupervisorTask to dynamically request locks at runtime
if it is run without explicit intervals in the granularity spec and with
appendToExisting set to false.  Previously, it behaved as if appendToExisting
was set to true, which was undocumented and inconsistent with IndexTask and
Hadoop indexing.

Also, when ParallelIndexSupervisorTask allocates segments in the explicit
interval case, fail if its locks on the interval have been revoked.

Also make a few other additions/clarifications to native ingestion docs.

Fixes #6989.

* Review feedback.

PR description on GitHub updated to match.

* Make native batch ingestion partitions start at 0

* Fix to previous commit

* Unit test. Verified to fail without the other commits on this branch.

* Another round of review

* Slightly scarier warning
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants