Skip to content

Commit

Permalink
Fix mmless ingestion and index tasks (apache#15372)
Browse files Browse the repository at this point in the history
* Fix mmless ingestion and index tasks

* Move comment

* remove dup test
  • Loading branch information
George Shiqi Wu authored and ythorat2 committed Dec 1, 2023
1 parent c2ca0c7 commit 7e0e0f4
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode

private IngestionState ingestionState;

private boolean shouldCleanup;

@MonotonicNonNull
private ParseExceptionHandler determinePartitionsParseExceptionHandler;

Expand Down Expand Up @@ -206,7 +208,8 @@ public IndexTask(
null,
ingestionSchema,
context,
-1
-1,
true
);
}

Expand All @@ -218,7 +221,8 @@ public IndexTask(
@Nullable String baseSequenceName,
IndexIngestionSpec ingestionSchema,
Map<String, Object> context,
int maxAllowedLockCount
int maxAllowedLockCount,
boolean shouldCleanup
)
{
super(
Expand All @@ -233,6 +237,7 @@ public IndexTask(
this.baseSequenceName = baseSequenceName == null ? getId() : baseSequenceName;
this.ingestionSchema = ingestionSchema;
this.ingestionState = IngestionState.NOT_STARTED;
this.shouldCleanup = shouldCleanup;
}

@Override
Expand Down Expand Up @@ -1080,6 +1085,14 @@ private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema)
return ingestionSchema.getIOConfig().getNonNullInputFormat();
}

@Override
public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception
{
if (shouldCleanup) {
super.cleanUp(toolbox, taskStatus);
}
}

public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig>
{
private final DataSchema dataSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,9 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
),
getContext(),
getIngestionSchema().getTuningConfig().getMaxAllowedLockCount()
getIngestionSchema().getTuningConfig().getMaxAllowedLockCount(),
// Don't run cleanup in the IndexTask since we are wrapping it in the ParallelIndexSupervisorTask
false
);

if (currentSubTaskHolder.setTask(sequentialIndexTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
Expand Down Expand Up @@ -2690,6 +2691,72 @@ public void testErrorWhenDropFlagTrueAndOverwriteFalse() throws Exception
);
}

// If shouldCleanup is false, cleanup should be a no-op
@Test
public void testCleanupIndexTask() throws Exception
{
new IndexTask(
null,
null,
null,
"dataSource",
null,
createDefaultIngestionSpec(
jsonMapper,
temporaryFolder.newFolder(),
new UniformGranularitySpec(
Granularities.MINUTE,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
),
null,
0,
false
).cleanUp(null, null);
}

/* if shouldCleanup is true, we should fall back to AbstractTask.cleanup,
* check isEncapsulatedTask=false, and then exit.
*/
@Test
public void testCleanup() throws Exception
{
TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig);
EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false);
EasyMock.replay(toolbox, taskConfig);
new IndexTask(
null,
null,
null,
"dataSource",
null,
createDefaultIngestionSpec(
jsonMapper,
temporaryFolder.newFolder(),
new UniformGranularitySpec(
Granularities.MINUTE,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
),
null,
0,
true
).cleanUp(toolbox, null);
EasyMock.verify(toolbox, taskConfig);
}

public static void checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status)
{
// full stacktrace will be too long and make tests brittle (e.g. if line # changes), just match the main message
Expand Down

0 comments on commit 7e0e0f4

Please sign in to comment.