Skip to content

Commit

Permalink
remove same_interval_merge_sub from Task.java and remove other no nee…
Browse files Browse the repository at this point in the history
…ded code
  • Loading branch information
kaijianding committed Mar 3, 2017
1 parent b94a61e commit 71080ab
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@

package io.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -126,11 +123,11 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
SubTask mergeTask = new SubTask(
getId(),
getDataSource(),
segments,
aggregators,
rollup,
indexSpec,
buildV9Directly,
segments,
getContext()
);
final TaskStatus status = mergeTask.run(toolbox);
Expand All @@ -142,18 +139,15 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception

public static class SubTask extends MergeTask
{
private static final String TYPE = "same_interval_merge_sub";

@JsonCreator
public SubTask(
@JsonProperty("baseId") String baseId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("context") Map<String, Object> context
String baseId,
String dataSource,
List<DataSegment> segments,
List<AggregatorFactory> aggregators,
Boolean rollup,
IndexSpec indexSpec,
Boolean buildV9Directly,
Map<String, Object> context
)
{
super(
Expand All @@ -168,22 +162,10 @@ public SubTask(
);
}

@Override
public String getType()
{
return TYPE;
}

@Override
protected void verifyInputSegments(List<DataSegment> segments)
{
// do nothing
}

@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return taskActionClient.submit(new LockTryAcquireAction(getInterval())) != null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@
@JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentBackwardsCompatibleTask.SubTask.class), // backwards compat - Deprecated
@JsonSubTypes.Type(name = "convert_segment", value = ConvertSegmentTask.class),
@JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class),
@JsonSubTypes.Type(name = "same_interval_merge", value = SameIntervalMergeTask.class),
@JsonSubTypes.Type(name = "same_interval_merge_sub", value = SameIntervalMergeTask.SubTask.class)
@JsonSubTypes.Type(name = "same_interval_merge", value = SameIntervalMergeTask.class)
})
public interface Task
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,49 +372,6 @@ public void testSameIntervalMergeTaskSerde() throws Exception
);
}

@Test
public void testSameIntervalMergeSubTaskSerde() throws Exception
{
final List<DataSegment> segments = ImmutableList.<DataSegment>of(
DataSegment.builder()
.dataSource("foo")
.interval(new Interval("2010-01-01/P1D"))
.version("1234")
.build()
);
final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("cnt"));
final SameIntervalMergeTask.SubTask task = new SameIntervalMergeTask.SubTask(
null,
"foo",
aggregators,
true,
indexSpec,
true,
segments,
null
);

final String json = jsonMapper.writeValueAsString(task);

Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final SameIntervalMergeTask.SubTask task2 = (SameIntervalMergeTask.SubTask) jsonMapper.readValue(json, Task.class);

Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());

Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertEquals(task.getRollup(), task2.getRollup());
Assert.assertEquals(task.getIndexSpec(), task2.getIndexSpec());
Assert.assertEquals(task.getSegments(), task2.getSegments());
Assert.assertEquals(
task.getAggregators().get(0).getName(),
task2.getAggregators().get(0).getName()
);
}

@Test
public void testKillTaskSerde() throws Exception
{
Expand Down

0 comments on commit 71080ab

Please sign in to comment.