Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compaction task #4985

Merged
merged 21 commits into from
Nov 4, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/ingestion/firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ A sample ingest firehose spec is shown below -
|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes|
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](../querying/filters.html)|yes|
|filter| See [Filters](../querying/filters.html)|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.


#### CombiningFirehose

Expand Down
84 changes: 66 additions & 18 deletions docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Tasks can have different default priorities depening on their types. Here are a
|---------|----------------|
|Realtime index task|75|
|Batch index task|50|
|Merge/Append task|25|
|Merge/Append/Compation task|25|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compaction (spelling)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

|Other tasks|0|

You can override the task priority by setting your priority in the task context like below.
Expand Down Expand Up @@ -184,19 +184,6 @@ On the contrary, in the incremental publishing mode, segments are incrementally

To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig.

### Task Context

The task context is used for various task configuration parameters. The following parameters apply to all tasks.

|property|default|description|
|--------|-------|-----------|
|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).|

<div class="note caution">
When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result.
As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords.
</div>

Segment Merging Tasks
---------------------

Expand All @@ -210,7 +197,8 @@ Append tasks append a list of segments together into a single segment (one after
"id": <task_id>,
"dataSource": <task_datasource>,
"segments": <JSON list of DataSegment objects to append>,
"aggregations": <optional list of aggregators>
"aggregations": <optional list of aggregators>,
"context": <task context>
}
```

Expand All @@ -228,7 +216,8 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"segments": <JSON list of DataSegment objects to merge>
"segments": <JSON list of DataSegment objects to merge>,
"context": <task context>
}
```

Expand All @@ -245,10 +234,53 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"interval": <DataSegment objects in this interval are going to be merged>
"interval": <DataSegment objects in this interval are going to be merged>,
"context": <task context>
}
```

### Compaction Task

Compaction tasks merge all segments of the given interval. The syntax is:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should include a segmentGranularity too. Unless your idea is that the interval specified should just be one segment's worth of interval, in which case, that should be said in the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, all the segments of the interval are always merged into a single segment. I added the below statement.

This compaction task merges all segments of the interval 2017-01-01/2018-01-01 into a single segment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding two more sentences:

To merge each day's worth of data into a separate segment, you can submit multiple "compact" tasks, one for each day. They will run in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


```json
{
"type": "compact",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval": <interval to specify segments to be merged>,
"tuningConfig" <index task tuningConfig>,
"context": <task context>
}
```

|Field|Description|Required|
|-----|-----------|--------|
|`type`|Task type. Should be `compact`|Yes|
|`id`|Task id|No|
|`dataSource`|dataSource name to be compacted|Yes|
|`interval`|interval of segments to be compacted|Yes|
|`tuningConfig`|[Index task tuningConfig](#tuningconfig)|No|
|`context`|[Task context](#taskcontext)|No|

An example of compaction task is

```json
{
"type" : "compact",
"dataSource" : "wikipedia",
"interval" : "2017-01-01/2018-01-01"
}
```

This compaction task merges _all segments_ of the interval `2017-01-01/2018-01-01`.

A compaction task internally generates an indexTask spec for performing compaction work with some fixed parameters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably more clear:

generates an "index" task spec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

For example, its `firehose` is always the [ingestSegmentSpec](./firehose.html) and `dimensionsSpec` and `metricsSpec`
always include all dimensions and metrics of the input segments.

Note that all input segments should have the same `queryGranularity` and `rollup`. See [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes) for more details.
Copy link
Contributor

@gianm gianm Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if they don't have consistent queryGranularity and rollup? (Docs should say and it should hopefully be reasonable, since this situation may happen in real life.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It thrown an exception before, but now, it automatically checks and sets rollup if it is set for all input segments.


Segment Destroying Tasks
------------------------

Expand All @@ -261,7 +293,8 @@ Kill tasks delete all information about a segment and removes it from deep stora
"type": "kill",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_segments_in_this_interval_will_die!>
"interval" : <all_segments_in_this_interval_will_die!>,
"context": <task context>
}
```

Expand Down Expand Up @@ -342,6 +375,21 @@ These tasks start, sleep for a time and are used only for testing. The available
}
```

Task Context
------------

The task context is used for various task configuration parameters. The following parameters apply to all task types.

|property|default|description|
|--------|-------|-----------|
|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).|
|priority|Different based on task types. See [Task Priority](#task-priority).|Task priority|

<div class="note caution">
When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result.
As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords.
</div>

Locking
-------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import io.druid.query.QueryRunner;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -80,15 +82,27 @@ protected AbstractTask(
this.context = context;
}

public static String makeId(String id, final String typeName, String dataSource, Interval interval)
static String getOrMakeId(String id, final String typeName, String dataSource)
{
return id != null ? id : joinId(
typeName,
dataSource,
interval.getStart(),
interval.getEnd(),
DateTimes.nowUtc().toString()
);
return getOrMakeId(id, typeName, dataSource, null);
}

static String getOrMakeId(String id, final String typeName, String dataSource, @Nullable Interval interval)
{
if (id != null) {
return id;
}

final List<Object> objects = new ArrayList<>();
objects.add(typeName);
objects.add(dataSource);
if (interval != null) {
objects.add(interval.getStart());
objects.add(interval.getEnd());
}
objects.add(DateTimes.nowUtc().toString());

return joinId(objects);
}

@JsonProperty
Expand Down Expand Up @@ -167,7 +181,12 @@ public String toString()
*
* @return string of joined objects
*/
public static String joinId(Object... objects)
static String joinId(List<Object> objects)
{
return ID_JOINER.join(objects);
}

static String joinId(Object...objects)
{
return ID_JOINER.join(objects);
}
Expand Down Expand Up @@ -202,7 +221,7 @@ public int hashCode()
return id.hashCode();
}

protected List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
{
return client.submit(new LockListAction());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ArchiveTask(
)
{
super(
makeId(id, "archive", dataSource, interval),
getOrMakeId(id, "archive", dataSource, interval),
dataSource,
interval,
context
Expand Down
Loading