Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow task to override ForkingTaskRunner tunings and jvm settings #1604

Merged
merged 1 commit into from
Sep 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import java.util.Map;
import org.joda.time.Interval;

public abstract class AbstractFixedIntervalTask extends AbstractTask
Expand All @@ -32,47 +33,52 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
protected AbstractFixedIntervalTask(
String id,
String dataSource,
Interval interval
Interval interval,
Map<String, Object> context
)
{
this(id, id, new TaskResource(id, 1), dataSource, interval);
this(id, id, new TaskResource(id, 1), dataSource, interval, context);
}

protected AbstractFixedIntervalTask(
String id,
TaskResource taskResource,
String dataSource,
Interval interval
Interval interval,
Map<String, Object> context
)
{
this(
id,
id,
taskResource == null ? new TaskResource(id, 1) : taskResource,
dataSource,
interval
interval,
context
);
}

protected AbstractFixedIntervalTask(
String id,
String groupId,
String dataSource,
Interval interval
Interval interval,
Map<String, Object> context
)
{
this(id, groupId, new TaskResource(id, 1), dataSource, interval);
this(id, groupId, new TaskResource(id, 1), dataSource, interval, context);
}

protected AbstractFixedIntervalTask(
String id,
String groupId,
TaskResource taskResource,
String dataSource,
Interval interval
Interval interval,
Map<String, Object> context
)
{
super(id, groupId, taskResource, dataSource);
super(id, groupId, taskResource, dataSource, context);
this.interval = Preconditions.checkNotNull(interval, "interval");
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.indexing.common.actions.LockListAction;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand All @@ -49,22 +50,25 @@ public abstract class AbstractTask implements Task
@JsonIgnore
private final String dataSource;

protected AbstractTask(String id, String dataSource)
private final Map<String, Object> context;

protected AbstractTask(String id, String dataSource, Map<String, Object> context)
{
this(id, id, new TaskResource(id, 1), dataSource);
this(id, id, new TaskResource(id, 1), dataSource, context);
}

protected AbstractTask(String id, String groupId, String dataSource)
protected AbstractTask(String id, String groupId, String dataSource, Map<String, Object> context)
{
this(id, groupId, new TaskResource(id, 1), dataSource);
this(id, groupId, new TaskResource(id, 1), dataSource, context);
}

protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource)
protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource, Map<String, Object> context)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.taskResource = Preconditions.checkNotNull(taskResource, "resource");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.context = context;
}

public static String makeId(String id, final String typeName, String dataSource, Interval interval)
Expand Down Expand Up @@ -179,4 +183,18 @@ protected Iterable<TaskLock> getTaskLocks(TaskToolbox toolbox) throws IOExceptio
{
return toolbox.getTaskActionClient().submit(new LockListAction());
}

@Override
@JsonProperty
public Map<String, Object> getContext()
{
return context;
}

@Override
public Object getContextValue(String key)
{
return context == null ? null : context.get(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ public AppendTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("indexSpec") IndexSpec indexSpec
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("context") Map<String, Object> context
)
{
super(id, dataSource, segments);
super(id, dataSource, segments, context);
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
import io.druid.timeline.DataSegment;
import java.util.Map;
import org.joda.time.Interval;

import java.util.List;
Expand All @@ -39,13 +40,15 @@ public class ArchiveTask extends AbstractFixedIntervalTask
public ArchiveTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context
)
{
super(
makeId(id, "archive", dataSource, interval),
dataSource,
interval
interval,
context
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public ConvertSegmentBackwardsCompatibleTask(
segment,
indexSpec,
force == null ? false : force,
validate ==null ? false : validate
validate ==null ? false : validate,
null
);
}

Expand All @@ -43,7 +44,7 @@ public SubTask(
@JsonProperty("validate") Boolean validate
)
{
super(groupId, segment, indexSpec, force, validate);
super(groupId, segment, indexSpec, force, validate, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ public static ConvertSegmentTask create(
Interval interval,
IndexSpec indexSpec,
boolean force,
boolean validate
boolean validate,
Map<String, Object> context
)
{
final String id = makeId(dataSource, interval);
return new ConvertSegmentTask(id, dataSource, interval, null, indexSpec, force, validate);
return new ConvertSegmentTask(id, dataSource, interval, null, indexSpec, force, validate, context);
}

/**
Expand All @@ -98,12 +99,13 @@ public static ConvertSegmentTask create(
*
* @return A SegmentConverterTask for the segment with the indexSpec specified.
*/
public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force, boolean validate)
public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force, boolean validate, Map<String, Object> context
)
{
final Interval interval = segment.getInterval();
final String dataSource = segment.getDataSource();
final String id = makeId(dataSource, interval);
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, force, validate);
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, force, validate, context);
}

protected static String makeId(String dataSource, Interval interval)
Expand All @@ -121,19 +123,20 @@ private static ConvertSegmentTask createFromJson(
@JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("force") Boolean force,
@JsonProperty("validate") Boolean validate
@JsonProperty("validate") Boolean validate,
@JsonProperty("context") Map<String, Object> context
)
{
final boolean isForce = force == null ? false : force;
final boolean isValidate = validate == null ? true : validate;
if (id == null) {
if (segment == null) {
return create(dataSource, interval, indexSpec, isForce, isValidate);
return create(dataSource, interval, indexSpec, isForce, isValidate, context);
} else {
return create(segment, indexSpec, isForce, isValidate);
return create(segment, indexSpec, isForce, isValidate, context);
}
}
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, isForce, isValidate);
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, isForce, isValidate, context);
}

protected ConvertSegmentTask(
Expand All @@ -143,10 +146,11 @@ protected ConvertSegmentTask(
DataSegment segment,
IndexSpec indexSpec,
boolean force,
boolean validate
boolean validate,
Map<String, Object> context
)
{
super(id, dataSource, interval);
super(id, dataSource, interval, context);
this.segment = segment;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.force = force;
Expand Down Expand Up @@ -224,7 +228,7 @@ public boolean apply(DataSegment segment)
segmentsToUpdate = Collections.singleton(segment);
}
// Vestigial from a past time when this task spawned subtasks.
for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, indexSpec, force, validate)) {
for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, indexSpec, force, validate, getContext())) {
final TaskStatus status = subTask.run(toolbox);
if (!status.isSuccess()) {
return TaskStatus.fromCode(getId(), status.getStatusCode());
Expand All @@ -238,7 +242,8 @@ protected Iterable<Task> generateSubTasks(
final Iterable<DataSegment> segments,
final IndexSpec indexSpec,
final boolean force,
final boolean validate
final boolean validate,
final Map<String, Object> context
)
{
return Iterables.transform(
Expand All @@ -248,7 +253,7 @@ protected Iterable<Task> generateSubTasks(
@Override
public Task apply(DataSegment input)
{
return new SubTask(groupId, input, indexSpec, force, validate);
return new SubTask(groupId, input, indexSpec, force, validate, context);
}
}
);
Expand Down Expand Up @@ -287,7 +292,8 @@ public SubTask(
@JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("force") Boolean force,
@JsonProperty("validate") Boolean validate
@JsonProperty("validate") Boolean validate,
@JsonProperty("context") Map<String, Object> context
)
{
super(
Expand All @@ -300,7 +306,8 @@ public SubTask(
),
groupId,
segment.getDataSource(),
segment.getInterval()
segment.getInterval(),
context
);
this.segment = segment;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public HadoopConverterTask(
@JsonProperty("distributedSuccessCache") URI distributedSuccessCache,
@JsonProperty("jobPriority") String jobPriority,
@JsonProperty("segmentOutputPath") String segmentOutputPath,
@JsonProperty("classpathPrefix") String classpathPrefix
@JsonProperty("classpathPrefix") String classpathPrefix,
@JsonProperty("context") Map<String, Object> context
)
{
super(
Expand All @@ -78,7 +79,8 @@ public HadoopConverterTask(
null, // Always call subtask codepath
indexSpec,
force,
validate == null ? true : validate
validate == null ? true : validate,
context
);
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
this.distributedSuccessCache = Preconditions.checkNotNull(distributedSuccessCache, "distributedSuccessCache");
Expand Down Expand Up @@ -130,13 +132,15 @@ protected Iterable<Task> generateSubTasks(
final Iterable<DataSegment> segments,
final IndexSpec indexSpec,
final boolean force,
final boolean validate
final boolean validate,
Map<String, Object> context
)
{
return Collections.<Task>singleton(
new ConverterSubTask(
ImmutableList.copyOf(segments),
this
this,
context
)
);
}
Expand Down Expand Up @@ -164,7 +168,8 @@ public static class ConverterSubTask extends HadoopTask
@JsonCreator
public ConverterSubTask(
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("parent") HadoopConverterTask parent
@JsonProperty("parent") HadoopConverterTask parent,
@JsonProperty("context") Map<String, Object> context
)
{
super(
Expand All @@ -175,7 +180,8 @@ public ConverterSubTask(
parent.getInterval().getEnd()
),
parent.getDataSource(),
parent.getHadoopDependencyCoordinates()
parent.getHadoopDependencyCoordinates(),
context
);
this.segments = segments;
this.parent = parent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister;
import io.druid.timeline.DataSegment;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -88,15 +89,17 @@ public HadoopIndexTask(
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("classpathPrefix") String classpathPrefix,
@JacksonInject ObjectMapper jsonMapper
@JacksonInject ObjectMapper jsonMapper,
@JsonProperty("context") Map<String, Object> context
)
{
super(
id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec), new DateTime()),
getTheDataSource(spec),
hadoopDependencyCoordinates == null
? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates))
: hadoopDependencyCoordinates
: hadoopDependencyCoordinates,
context
);


Expand Down
Loading