Skip to content

Commit

Permalink
index_parallel: support !appendToExisting with no explicit intervals (#…
Browse files Browse the repository at this point in the history
…7046)

* index_parallel: support !appendToExisting with no explicit intervals

This enables ParallelIndexSupervisorTask to dynamically request locks at runtime
if it is run without explicit intervals in the granularity spec and with
appendToExisting set to false.  Previously, it behaved as if appendToExisting
was set to true, which was undocumented and inconsistent with IndexTask and
Hadoop indexing.

Also, when ParallelIndexSupervisorTask allocates segments in the explicit
interval case, fail if its locks on the interval have been revoked.

Also make a few other additions/clarifications to native ingestion docs.

Fixes #6989.

* Review feedback.

PR description on GitHub updated to match.

* Make native batch ingestion partitions start at 0

* Fix to previous commit

* Unit test. Verified to fail without the other commits on this branch.

* Another round of review

* Slightly scarier warning
  • Loading branch information
glasser authored and jihoonson committed Feb 20, 2019
1 parent 9a52152 commit a81b1b8
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 31 deletions.
4 changes: 2 additions & 2 deletions docs/content/ingestion/ingestion-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ This spec is used to generated segments with uniform intervals.
| segmentGranularity | string | The granularity to create time chunks at. Multiple segments can be created per time chunk. For example, with 'DAY' `segmentGranularity`, the events of the same day fall into the same time chunk which can be optionally further partitioned into multiple segments based on other configurations and input size. See [Granularity](../querying/granularities.html) for supported granularities.| no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. A granularity of 'NONE' means millisecond granularity. See [Granularity](../querying/granularities.html) for supported granularities.| no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may skip determining partitions phase which results in faster ingestion. |
| intervals | JSON string array | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no. If specified, Hadoop and native non-parallel batch ingestion tasks may skip determining partitions phase which results in faster ingestion; native parallel ingestion tasks can request all their locks up-front instead of one by one. Batch ingestion will thrown away any data not in the specified intervals. |

### Arbitrary Granularity Spec

Expand All @@ -296,7 +296,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre
|-------|------|-------------|----------|
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. A granularity of 'NONE' means millisecond granularity. See [Granularity](../querying/granularities.html) for supported granularities.| no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may skip determining partitions phase which results in faster ingestion. |
| intervals | JSON string array | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no. If specified, Hadoop and native non-parallel batch ingestion tasks may skip determining partitions phase which results in faster ingestion; native parallel ingestion tasks can request all their locks up-front instead of one by one. Batch ingestion will thrown away any data not in the specified intervals. |

# Transform Spec

Expand Down
27 changes: 27 additions & 0 deletions docs/content/ingestion/native_tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ MiddleManager.

Please check [Hadoop-based Batch Ingestion VS Native Batch Ingestion](./hadoop-vs-native-batch.html) for differences between native batch ingestion and Hadoop-based ingestion.

To run either kind of native batch indexing task, write an ingestion spec as specified below. Then POST it to the
[`/druid/indexer/v1/task` endpoint on the Overlord](../operations/api-reference.html#tasks), or use the `post-index-task` script included with Druid.

Parallel Index Task
--------------------------------

Expand Down Expand Up @@ -124,6 +127,11 @@ An example ingestion spec is:
}
```

By default, batch ingestion replaces all data in any segment that it writes to. If you'd like to add to the segment
instead, set the appendToExisting flag in ioConfig. Note that it only replaces data in segments where it actively adds
data: if there are segments in your granularitySpec's intervals that have no data written by this task, they will be
left alone.

#### Task Properties

|property|description|required?|
Expand All @@ -139,6 +147,14 @@ This field is required.

See [Ingestion Spec DataSchema](../ingestion/ingestion-spec.html#dataschema)

If you specify `intervals` explicitly in your dataSchema's granularitySpec, batch ingestion will lock the full intervals
specified when it starts up, and you will learn quickly if the specified interval overlaps with locks held by other
tasks (eg, Kafka ingestion). Otherwise, batch ingestion will lock each interval as it is discovered, so you may only
learn that the task overlaps with a higher-priority task later in ingestion. If you specify `intervals` explicitly, any
rows outside the specified intervals will be thrown away. We recommend setting `intervals` explicitly if you know the
time range of the data so that locking failure happens faster, and so that you don't accidentally replace data outside
that range if there's some stray data with unexpected timestamps.

#### IOConfig

|property|description|default|required?|
Expand Down Expand Up @@ -463,6 +479,11 @@ The Local Index Task is designed to be used for smaller data sets. The task exec
}
```

By default, batch ingestion replaces all data in any segment that it writes to. If you'd like to add to the segment
instead, set the appendToExisting flag in ioConfig. Note that it only replaces data in segments where it actively adds
data: if there are segments in your granularitySpec's intervals that have no data written by this task, they will be
left alone.

#### Task Properties

|property|description|required?|
Expand All @@ -478,6 +499,12 @@ This field is required.

See [Ingestion Spec DataSchema](../ingestion/ingestion-spec.html#dataschema)

If you do not specify `intervals` explicitly in your dataSchema's granularitySpec, the Local Index Task will do an extra
pass over the data to determine the range to lock when it starts up. If you specify `intervals` explicitly, any rows
outside the specified intervals will be thrown away. We recommend setting `intervals` explicitly if you know the time
range of the data because it allows the task to skip the extra pass, and so that you don't accidentally replace data outside
that range if there's some stray data with unexpected timestamps.

#### IOConfig

|property|description|default|required?|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@

public final class Counters
{
public static <K> int incrementAndGetInt(ConcurrentHashMap<K, AtomicInteger> counters, K key)
public static <K> int getAndIncrementInt(ConcurrentHashMap<K, AtomicInteger> counters, K key)
{
// get() before computeIfAbsent() is an optimization to avoid locking in computeIfAbsent() if not needed.
// See https://github.com/apache/incubator-druid/pull/6898#discussion_r251384586.
AtomicInteger counter = counters.get(key);
if (counter == null) {
counter = counters.computeIfAbsent(key, k -> new AtomicInteger());
}
return counter.incrementAndGet();
return counter.getAndIncrement();
}

public static <K> long incrementAndGetLong(ConcurrentHashMap<K, AtomicLong> counters, K key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,8 @@ private SegmentAllocator createSegmentAllocator(
)
{
final DataSchema dataSchema = ingestionSchema.getDataSchema();
final boolean explicitIntervals = dataSchema.getGranularitySpec().bucketIntervals().isPresent();
final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
if (ioConfig.isAppendToExisting() || !explicitIntervals) {
if (ioConfig.isAppendToExisting()) {
return new ActionBasedSegmentAllocator(
toolbox.getTaskActionClient(),
dataSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockTryAcquireAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractTask;
Expand Down Expand Up @@ -360,43 +362,79 @@ SegmentIdWithShardSpec allocateNewSegment(DateTime timestamp) throws IOException
{
final String dataSource = getDataSource();
final GranularitySpec granularitySpec = getIngestionSchema().getDataSchema().getGranularitySpec();
final SortedSet<Interval> bucketIntervals = Preconditions.checkNotNull(
granularitySpec.bucketIntervals().orNull(),
"bucketIntervals"
);
final Optional<SortedSet<Interval>> bucketIntervals = granularitySpec.bucketIntervals();

// List locks whenever allocating a new segment because locks might be revoked and no longer valid.
final Map<Interval, String> versions = toolbox
final List<TaskLock> locks = toolbox
.getTaskActionClient()
.submit(new LockListAction())
.submit(new LockListAction());
final TaskLock revokedLock = locks.stream().filter(TaskLock::isRevoked).findAny().orElse(null);
if (revokedLock != null) {
throw new ISE("Lock revoked: [%s]", revokedLock);
}
final Map<Interval, String> versions = locks
.stream()
.collect(Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion));

final Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp);
if (!maybeInterval.isPresent()) {
throw new IAE("Could not find interval for timestamp [%s]", timestamp);
}
Interval interval;
String version;
boolean justLockedInterval = false;
if (bucketIntervals.isPresent()) {
// If the granularity spec has explicit intervals, we just need to find the interval (of the segment
// granularity); we already tried to lock it at task startup.
final Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp);
if (!maybeInterval.isPresent()) {
throw new IAE("Could not find interval for timestamp [%s]", timestamp);
}

interval = maybeInterval.get();
if (!bucketIntervals.get().contains(interval)) {
throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec);
}

final Interval interval = maybeInterval.get();
if (!bucketIntervals.contains(interval)) {
throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec);
version = findVersion(versions, interval);
if (version == null) {
throw new ISE("Cannot find a version for interval[%s]", interval);
}
} else {
// We don't have explicit intervals. We can use the segment granularity to figure out what
// interval we need, but we might not have already locked it.
interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
version = findVersion(versions, interval);
if (version == null) {
// We don't have a lock for this interval, so we should lock it now.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)),
"Cannot acquire a lock for interval[%s]", interval
);
version = lock.getVersion();
justLockedInterval = true;
}
}

final int partitionNum = Counters.incrementAndGetInt(partitionNumCountersPerInterval, interval);
final int partitionNum = Counters.getAndIncrementInt(partitionNumCountersPerInterval, interval);
if (justLockedInterval && partitionNum != 0) {
throw new ISE(
"Expected partitionNum to be 0 for interval [%s] right after locking, but got [%s]",
interval, partitionNum
);
}
return new SegmentIdWithShardSpec(
dataSource,
interval,
findVersion(versions, interval),
version,
new NumberedShardSpec(partitionNum, 0)
);
}

@Nullable
private static String findVersion(Map<Interval, String> versions, Interval interval)
{
return versions.entrySet().stream()
.filter(entry -> entry.getKey().contains(interval))
.map(Entry::getValue)
.findFirst()
.orElseThrow(() -> new ISE("Cannot find a version for interval[%s]", interval));
.orElse(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public TaskLockbox getLockbox()
return lockbox;
}

public IndexerSQLMetadataStorageCoordinator getStorageCoordinator()
{
return storageCoordinator;
}

public TaskActionToolbox createTaskActionToolbox()
{
storageCoordinator.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -51,6 +52,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSupervisorTaskTest
Expand Down Expand Up @@ -126,8 +128,7 @@ public void testIsReady() throws Exception
}
}

@Test
public void testWithoutInterval() throws Exception
private void runTestWithoutIntervalTask() throws Exception
{
final ParallelIndexSupervisorTask task = newTask(
null,
Expand All @@ -142,6 +143,29 @@ public void testWithoutInterval() throws Exception
prepareTaskForLocking(task);
Assert.assertTrue(task.isReady(actionClient));
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
shutdownTask(task);
}

@Test
public void testWithoutInterval() throws Exception
{
// Ingest all data.
runTestWithoutIntervalTask();

// Read the segments for one day.
final Interval interval = Intervals.of("2017-12-24/P1D");
final List<DataSegment> oldSegments =
getStorageCoordinator().getUsedSegmentsForInterval("dataSource", interval);
Assert.assertEquals(1, oldSegments.size());

// Reingest the same data. Each segment should get replaced by a segment with a newer version.
runTestWithoutIntervalTask();

// Verify that the segment has been replaced.
final List<DataSegment> newSegments =
getStorageCoordinator().getUsedSegmentsForInterval("dataSource", interval);
Assert.assertEquals(1, newSegments.size());
Assert.assertTrue(oldSegments.get(0).getVersion().compareTo(newSegments.get(0).getVersion()) < 0);
}

@Test()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Interval;
Expand All @@ -41,6 +42,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class CoordinatorResourceTestClient
{
Expand Down Expand Up @@ -80,6 +83,11 @@ private String getIntervalsURL(String dataSource)
return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
}

private String getFullSegmentsURL(String dataSource)
{
return StringUtils.format("%sdatasources/%s/segments?full", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
}

private String getLoadStatusURL()
{
return StringUtils.format("%s%s", getCoordinatorURL(), "loadstatus");
Expand Down Expand Up @@ -123,6 +131,25 @@ public List<String> getSegmentIntervals(final String dataSource)
return segments;
}

// return a set of the segment versions for the specified datasource
public Set<String> getSegmentVersions(final String dataSource)
{
ArrayList<DataSegment> segments;
try {
StatusResponseHolder response = makeRequest(HttpMethod.GET, getFullSegmentsURL(dataSource));

segments = jsonMapper.readValue(
response.getContent(), new TypeReference<List<DataSegment>>()
{
}
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
return segments.stream().map(s -> s.getVersion()).collect(Collectors.toSet());
}

private Map<String, Integer> getLoadStatus()
{
Map<String, Integer> status;
Expand Down
Loading

0 comments on commit a81b1b8

Please sign in to comment.