diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 14ecd4e90e82..63b0e985e0b7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; +import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.TuningConfig; @@ -30,6 +31,7 @@ import org.junit.Test; import java.io.File; +import java.io.IOException; public class KafkaIndexTaskTuningConfigTest { @@ -145,6 +147,100 @@ public void testConvert() Assert.assertEquals(5L, copy.getHandoffConditionTimeout()); } + @Test + public void testSerdeWithModifiedTuningConfigAddedField() throws IOException + { + KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig( + 1, + null, + 2, + 10L, + new Period("PT3S"), + new File("/tmp/xxx"), + 4, + new IndexSpec(), + true, + true, + 5L, + null, + null, + null, + true, + 42, + 42 + ); + + String serialized = mapper.writeValueAsString(base); + TestModifiedKafkaIndexTaskTuningConfig deserialized = + mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class); + + Assert.assertEquals(null, deserialized.getExtra()); + Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory()); + Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory()); + Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment()); + Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows()); + Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod()); + Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory()); + Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists()); + Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec()); + Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly()); + Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions()); + Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout()); + Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically()); + Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory()); + Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod()); + Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions()); + Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); + Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); + } + + @Test + public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException + { + TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig( + 1, + null, + 2, + 10L, + new Period("PT3S"), + new File("/tmp/xxx"), + 4, + new IndexSpec(), + true, + true, + 5L, + null, + null, + null, + true, + 42, + 42, + "extra string" + ); + + String serialized = mapper.writeValueAsString(base); + KafkaIndexTaskTuningConfig deserialized = + mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class); + + Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory()); + Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory()); + Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment()); + Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows()); + Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod()); + Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory()); + Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists()); + Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec()); + Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly()); + Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions()); + Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout()); + Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically()); + Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory()); + Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod()); + Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions()); + Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); + Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); + } + private static KafkaIndexTaskTuningConfig copy(KafkaIndexTaskTuningConfig config) { return new KafkaIndexTaskTuningConfig( diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index b30b75ee7886..b6e7b3406771 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -50,6 +50,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskClient; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig; +import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig; import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -62,6 +63,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData; import org.apache.druid.java.util.common.DateTimes; @@ -106,6 +108,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -139,7 +142,6 @@ public class KafkaSupervisorTest extends EasyMockSupport private static int topicPostfix; private static ZkUtils zkUtils; - private final int numThreads; private KafkaSupervisor supervisor; @@ -259,7 +261,7 @@ public static void tearDownClass() throws IOException @Test public void testNoInitialState() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(); @@ -314,7 +316,7 @@ public void testNoInitialState() throws Exception @Test public void testSkipOffsetGaps() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(); @@ -342,7 +344,7 @@ public void testSkipOffsetGaps() throws Exception @Test public void testMultiTask() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -397,7 +399,7 @@ public void testMultiTask() throws Exception @Test public void testReplicas() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -452,7 +454,7 @@ public void testReplicas() throws Exception @Test public void testLateMessageRejectionPeriod() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -491,7 +493,7 @@ public void testLateMessageRejectionPeriod() throws Exception @Test public void testEarlyMessageRejectionPeriod() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H")); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, new Period("PT1H")); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -533,7 +535,7 @@ public void testEarlyMessageRejectionPeriod() throws Exception */ public void testLatestOffset() throws Exception { - supervisor = getSupervisor(1, 1, false, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null); addSomeEvents(1100); Capture captured = Capture.newInstance(); @@ -574,7 +576,7 @@ public void testLatestOffset() throws Exception */ public void testDatasourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(); @@ -583,7 +585,11 @@ public void testDatasourceMetadata() throws Exception expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of(0, 1, 2)) + new SeekableStreamStartSequenceNumbers<>( + topic, + ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of(0, 1, 2) + ) ) ).anyTimes(); expect(taskQueue.add(capture(captured))).andReturn(true); @@ -613,14 +619,18 @@ public void testDatasourceMetadata() throws Exception @Test(expected = ISE.class) public void testBadMetadataOffsets() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of(0, 1, 2)) + new SeekableStreamStartSequenceNumbers<>( + topic, + ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of(0, 1, 2) + ) ) ).anyTimes(); replayAll(); @@ -630,47 +640,14 @@ public void testBadMetadataOffsets() throws Exception } @Test - public void testKillIncompatibleTasks() throws Exception + public void testDontKillTasksWithMismatchedDatasourceAndType() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); - // unexpected # of partitions (kill) + // different datasource (don't kill) Task id1 = createKafkaIndexTask( "id1", - DATASOURCE, - 1, - new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 10L)), - null, - null - ); - - // correct number of partitions and ranges (don't kill) - Task id2 = createKafkaIndexTask( - "id2", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)), - null, - null - ); - - // unexpected range on partition 2 (kill) - Task id3 = createKafkaIndexTask( - "id3", - DATASOURCE, - 1, - new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)), - null, - null - ); - - // different datasource (don't kill) - Task id4 = createKafkaIndexTask( - "id4", "other-datasource", 2, new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L), ImmutableSet.of()), @@ -680,8 +657,8 @@ public void testKillIncompatibleTasks() throws Exception ); // non KafkaIndexTask (don't kill) - Task id5 = new RealtimeIndexTask( - "id5", + Task id2 = new RealtimeIndexTask( + "id2", null, new FireDepartment( dataSchema, @@ -691,18 +668,12 @@ public void testKillIncompatibleTasks() throws Exception null ); - List existingTasks = ImmutableList.of(id1, id2, id3, id4, id5); + List existingTasks = ImmutableList.of(id1, id2); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); - expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); - expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); - expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); - expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); - expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); - expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) .anyTimes(); @@ -714,18 +685,10 @@ public void testKillIncompatibleTasks() throws Exception null ) ).anyTimes(); - expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true)); - expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false)); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); - taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, killing task", "id3"); - expect(taskQueue.add(anyObject(Task.class))).andReturn(true); - - TreeMap> checkpoints = new TreeMap<>(); - checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) - .andReturn(Futures.immediateFuture(checkpoints)) - .times(2); + expect(taskQueue.add(anyObject(Task.class))).andReturn(true).anyTimes(); replayAll(); @@ -737,7 +700,7 @@ public void testKillIncompatibleTasks() throws Exception @Test public void testKillBadPartitionAssignment() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -841,10 +804,11 @@ public void testKillBadPartitionAssignment() throws Exception verifyAll(); } + @Test public void testRequeueTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -933,7 +897,7 @@ public void testRequeueTaskWhenFailed() throws Exception @Test public void testRequeueAdoptedTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); DateTime now = DateTimes.nowUtc(); @@ -1034,7 +998,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception @Test public void testQueueNextTasksOnSuccess() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1136,7 +1100,7 @@ public void testBeginPublishAndQueueNextTasks() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1231,7 +1195,7 @@ public void testDiscoverExistingPublishingTask() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Task task = createKafkaIndexTask( @@ -1346,7 +1310,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Task task = createKafkaIndexTask( @@ -1454,7 +1418,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(6); Task id1 = createKafkaIndexTask( @@ -1561,7 +1525,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception @Test public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1620,7 +1584,7 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1706,7 +1670,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1797,7 +1761,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception @Test(expected = IllegalStateException.class) public void testStopNotStarted() { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisor.stop(false); } @@ -1809,7 +1773,7 @@ public void testStop() taskRunner.unregisterListener(StringUtils.format("KafkaSupervisor-%s", DATASOURCE)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisor.start(); supervisor.stop(false); @@ -1823,7 +1787,7 @@ public void testStopGracefully() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -1935,7 +1899,7 @@ public void testResetNoTasks() throws Exception taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisor.start(); supervisor.runInternal(); verifyAll(); @@ -1952,7 +1916,7 @@ public void testResetNoTasks() throws Exception @Test public void testResetDataSourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -1968,7 +1932,11 @@ public void testResetDataSourceMetadata() throws Exception Capture captureDataSourceMetadata = EasyMock.newCapture(); KafkaDataSourceMetadata kafkaDataSourceMetadata = new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L), ImmutableSet.of()) + new SeekableStreamStartSequenceNumbers<>( + topic, + ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L), + ImmutableSet.of() + ) ); KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata( @@ -2004,7 +1972,7 @@ public void testResetDataSourceMetadata() throws Exception @Test public void testResetNoDataSourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -2040,7 +2008,7 @@ public void testResetRunningTasks() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -2138,7 +2106,7 @@ public void testResetRunningTasks() throws Exception public void testNoDataIngestionTasks() throws Exception { final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1S", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null); //not adding any events Task id1 = createKafkaIndexTask( "id1", @@ -2234,7 +2202,7 @@ public void testNoDataIngestionTasks() throws Exception public void testCheckpointForInactiveTaskGroup() throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException { - supervisor = getSupervisor(2, 1, true, "PT1S", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null); //not adding any events final Task id1 = createKafkaIndexTask( "id1", @@ -2327,8 +2295,16 @@ public void testCheckpointForInactiveTaskGroup() supervisor.checkpoint( 0, ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, fakeCheckpoints, fakeCheckpoints.keySet())) + new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + topic, + checkpoints.get(0), + ImmutableSet.of() + )), + new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + topic, + fakeCheckpoints, + fakeCheckpoints.keySet() + )) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -2346,7 +2322,7 @@ public void testCheckpointForInactiveTaskGroup() public void testCheckpointForUnknownTaskGroup() throws InterruptedException { - supervisor = getSupervisor(2, 1, true, "PT1S", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null); //not adding any events final Task id1 = createKafkaIndexTask( "id1", @@ -2407,8 +2383,16 @@ public void testCheckpointForUnknownTaskGroup() supervisor.checkpoint( 0, ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())) + new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + topic, + Collections.emptyMap(), + ImmutableSet.of() + )), + new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + topic, + Collections.emptyMap(), + ImmutableSet.of() + )) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -2435,7 +2419,7 @@ public void testCheckpointForUnknownTaskGroup() public void testCheckpointWithNullTaskGroupId() throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException { - supervisor = getSupervisor(1, 3, true, "PT1S", null, null); + supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null); //not adding any events final Task id1 = createKafkaIndexTask( "id1", @@ -2513,7 +2497,11 @@ public void testCheckpointWithNullTaskGroupId() supervisor.checkpoint( null, ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())), + new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + topic, + checkpoints.get(0), + ImmutableSet.of() + )), new KafkaDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(topic, newCheckpoints.get(0), newCheckpoints.get(0).keySet()) ) @@ -2529,7 +2517,7 @@ public void testCheckpointWithNullTaskGroupId() @Test public void testSuspendedNoRunningTasks() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost); addSomeEvents(1); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -2562,7 +2550,7 @@ public void testSuspendedRunningTasks() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, true, kafkaHost); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true, kafkaHost); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -2670,7 +2658,7 @@ public void testResetSuspended() throws Exception taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost); supervisor.start(); supervisor.runInternal(); verifyAll(); @@ -2687,7 +2675,7 @@ public void testResetSuspended() throws Exception public void testFailedInitializationAndRecovery() throws Exception { // Block the supervisor initialization with a bad hostname config, make sure this doesn't block the lifecycle - supervisor = getSupervisor( + supervisor = getTestableSupervisor( 1, 1, true, @@ -2793,7 +2781,7 @@ public void testFailedInitializationAndRecovery() throws Exception @Test public void testGetCurrentTotalStats() { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false, kafkaHost); + supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false, kafkaHost); supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition(0), ImmutableMap.of(0, 0L), @@ -2834,6 +2822,286 @@ public void testGetCurrentTotalStats() Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1")); } + @Test + public void testDoNotKillCompatibleTasks() + throws Exception + { + // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks + int numReplicas = 2; + supervisor = getTestableSupervisorCustomIsTaskCurrent( + numReplicas, + 1, + true, + "PT1H", + new Period("P1D"), + new Period("P1D"), + false, + true + ); + + addSomeEvents(1); + + Task task = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 0L, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + "topic", + ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), + null, + null + ); + + List existingTasks = ImmutableList.of(task); + + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); + EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) + .anyTimes(); + EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) + .anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true); + + TreeMap> checkpoints1 = new TreeMap<>(); + checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); + + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(numReplicas); + + replayAll(); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testKillIncompatibleTasks() + throws Exception + { + // This supervisor always returns false for isTaskCurrent -> it should kill its tasks + int numReplicas = 2; + supervisor = getTestableSupervisorCustomIsTaskCurrent( + numReplicas, + 1, + true, + "PT1H", + new Period("P1D"), + new Period("P1D"), + false, + false + ); + + addSomeEvents(1); + + Task task = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 0L, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + List existingTasks = ImmutableList.of(task); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); + EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) + .anyTimes(); + EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) + .anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true)); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2); + + replayAll(); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testIsTaskCurrent() + { + DateTime minMessageTime = DateTimes.nowUtc(); + DateTime maxMessageTime = DateTimes.nowUtc().plus(10000); + + KafkaSupervisor supervisor = getSupervisor( + 2, + 1, + true, + "PT1H", + new Period("P1D"), + new Period("P1D"), + false, + kafkaHost, + dataSchema, + tuningConfig + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + 42, + ImmutableMap.of(0, 0L, 2, 0L), + Optional.of(minMessageTime), + Optional.of(maxMessageTime), + ImmutableSet.of("id1", "id2", "id3", "id4"), + ImmutableSet.of() + ); + + DataSchema modifiedDataSchema = getDataSchema("some other datasource"); + + KafkaSupervisorTuningConfig modifiedTuningConfig = new KafkaSupervisorTuningConfig( + 42, // This is different + null, + 50000, + null, + new Period("P1Y"), + new File("/test"), + null, + null, + true, + false, + null, + null, + null, + numThreads, + TEST_CHAT_THREADS, + TEST_CHAT_RETRIES, + TEST_HTTP_TIMEOUT, + TEST_SHUTDOWN_TIMEOUT, + null, + null, + null, + null, + null + ); + + KafkaIndexTask taskFromStorage = createKafkaIndexTask( + "id1", + 0, + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 0L, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + "topic", + ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), + minMessageTime, + maxMessageTime, + dataSchema + ); + + KafkaIndexTask taskFromStorageMismatchedDataSchema = createKafkaIndexTask( + "id2", + 0, + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 0L, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + "topic", + ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), + minMessageTime, + maxMessageTime, + modifiedDataSchema + ); + + KafkaIndexTask taskFromStorageMismatchedTuningConfig = createKafkaIndexTask( + "id3", + 0, + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 0L, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + "topic", + ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), + minMessageTime, + maxMessageTime, + dataSchema, + modifiedTuningConfig + ); + + KafkaIndexTask taskFromStorageMismatchedPartitionsWithTaskGroup = createKafkaIndexTask( + "id4", + 0, + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 0L, 2, 6L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + "topic", + ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), + minMessageTime, + maxMessageTime, + dataSchema + ); + + EasyMock.expect(taskStorage.getTask("id1")) + .andReturn(Optional.of(taskFromStorage)) + .once(); + EasyMock.expect(taskStorage.getTask("id2")) + .andReturn(Optional.of(taskFromStorageMismatchedDataSchema)) + .once(); + EasyMock.expect(taskStorage.getTask("id3")) + .andReturn(Optional.of(taskFromStorageMismatchedTuningConfig)) + .once(); + EasyMock.expect(taskStorage.getTask("id4")) + .andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup)) + .once(); + + replayAll(); + + Assert.assertTrue(supervisor.isTaskCurrent(42, "id1")); + Assert.assertFalse(supervisor.isTaskCurrent(42, "id2")); + Assert.assertFalse(supervisor.isTaskCurrent(42, "id3")); + Assert.assertFalse(supervisor.isTaskCurrent(42, "id4")); + verifyAll(); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { //create topic manually @@ -2858,7 +3126,7 @@ private void addSomeEvents(int numEventsPerPartition) throws Exception } } - private KafkaSupervisor getSupervisor( + private KafkaSupervisor getTestableSupervisor( int replicas, int taskCount, boolean useEarliestOffset, @@ -2867,7 +3135,7 @@ private KafkaSupervisor getSupervisor( Period earlyMessageRejectionPeriod ) { - return getSupervisor( + return getTestableSupervisor( replicas, taskCount, useEarliestOffset, @@ -2879,7 +3147,7 @@ private KafkaSupervisor getSupervisor( ); } - private KafkaSupervisor getSupervisor( + private KafkaSupervisor getTestableSupervisor( int replicas, int taskCount, boolean useEarliestOffset, @@ -2954,6 +3222,168 @@ public KafkaIndexTaskClient build( ); } + /** + * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent() + */ + private KafkaSupervisor getTestableSupervisorCustomIsTaskCurrent( + int replicas, + int taskCount, + boolean useEarliestOffset, + String duration, + Period lateMessageRejectionPeriod, + Period earlyMessageRejectionPeriod, + boolean suspended, + boolean isTaskCurrentReturn + ) + { + Map consumerProperties = new HashMap<>(); + consumerProperties.put("myCustomKey", "myCustomValue"); + consumerProperties.put("bootstrap.servers", kafkaHost); + consumerProperties.put("isolation.level", "read_committed"); + KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( + topic, + replicas, + taskCount, + new Period(duration), + consumerProperties, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + useEarliestOffset, + new Period("PT30M"), + lateMessageRejectionPeriod, + earlyMessageRejectionPeriod + ); + + KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( + null, + null + ) + { + @Override + public KafkaIndexTaskClient build( + TaskInfoProvider taskInfoProvider, + String dataSource, + int numThreads, + Duration httpTimeout, + long numRetries + ) + { + Assert.assertEquals(TEST_CHAT_THREADS, numThreads); + Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); + Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); + return taskClient; + } + }; + + return new TestableKafkaSupervisorWithCustomIsTaskCurrent( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + objectMapper, + new KafkaSupervisorSpec( + dataSchema, + tuningConfig, + kafkaSupervisorIOConfig, + null, + suspended, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + objectMapper, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory + ), + rowIngestionMetersFactory, + isTaskCurrentReturn + ); + } + + /** + * Use when you don't want generateSequenceNumber overridden + */ + + private KafkaSupervisor getSupervisor( + int replicas, + int taskCount, + boolean useEarliestOffset, + String duration, + Period lateMessageRejectionPeriod, + Period earlyMessageRejectionPeriod, + boolean suspended, + String kafkaHost, + DataSchema dataSchema, + KafkaSupervisorTuningConfig tuningConfig + ) + { + Map consumerProperties = new HashMap<>(); + consumerProperties.put("myCustomKey", "myCustomValue"); + consumerProperties.put("bootstrap.servers", kafkaHost); + consumerProperties.put("isolation.level", "read_committed"); + KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( + topic, + replicas, + taskCount, + new Period(duration), + consumerProperties, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + useEarliestOffset, + new Period("PT30M"), + lateMessageRejectionPeriod, + earlyMessageRejectionPeriod + ); + + KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( + null, + null + ) + { + @Override + public KafkaIndexTaskClient build( + TaskInfoProvider taskInfoProvider, + String dataSource, + int numThreads, + Duration httpTimeout, + long numRetries + ) + { + Assert.assertEquals(TEST_CHAT_THREADS, numThreads); + Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); + Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); + return taskClient; + } + }; + + return new KafkaSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + objectMapper, + new KafkaSupervisorSpec( + dataSchema, + tuningConfig, + kafkaSupervisorIOConfig, + null, + suspended, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + objectMapper, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory + ), + rowIngestionMetersFactory + ); + } + private static DataSchema getDataSchema(String dataSource) { List dimensions = new ArrayList<>(); @@ -2998,11 +3428,55 @@ private KafkaIndexTask createKafkaIndexTask( DateTime minimumMessageTime, DateTime maximumMessageTime ) + { + return createKafkaIndexTask( + id, + taskGroupId, + startPartitions, + endPartitions, + minimumMessageTime, + maximumMessageTime, + getDataSchema(dataSource) + ); + } + + private KafkaIndexTask createKafkaIndexTask( + String id, + int taskGroupId, + SeekableStreamStartSequenceNumbers startPartitions, + SeekableStreamEndSequenceNumbers endPartitions, + DateTime minimumMessageTime, + DateTime maximumMessageTime, + DataSchema schema + ) + { + return createKafkaIndexTask( + id, + taskGroupId, + startPartitions, + endPartitions, + minimumMessageTime, + maximumMessageTime, + schema, + (KafkaIndexTaskTuningConfig) tuningConfig.convertToTaskTuningConfig() + ); + } + + private KafkaIndexTask createKafkaIndexTask( + String id, + int taskGroupId, + SeekableStreamStartSequenceNumbers startPartitions, + SeekableStreamEndSequenceNumbers endPartitions, + DateTime minimumMessageTime, + DateTime maximumMessageTime, + DataSchema schema, + KafkaIndexTaskTuningConfig tuningConfig + ) { return new KafkaIndexTask( id, null, - getDataSchema(dataSource), + schema, tuningConfig, new KafkaIndexTaskIOConfig( taskGroupId, @@ -3054,7 +3528,6 @@ public String getDataSource() { return dataSource; } - } private static class TestableKafkaSupervisor extends KafkaSupervisor @@ -3084,7 +3557,9 @@ public TestableKafkaSupervisor( protected String generateSequenceName( Map startPartitions, Optional minimumMessageTime, - Optional maximumMessageTime + Optional maximumMessageTime, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig ) { final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()); @@ -3092,5 +3567,37 @@ protected String generateSequenceName( } } + private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor + { + private boolean isTaskCurrentReturn; + + public TestableKafkaSupervisorWithCustomIsTaskCurrent( + TaskStorage taskStorage, + TaskMaster taskMaster, + IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + KafkaIndexTaskClientFactory taskClientFactory, + ObjectMapper mapper, + KafkaSupervisorSpec spec, + RowIngestionMetersFactory rowIngestionMetersFactory, + boolean isTaskCurrentReturn + ) + { + super( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + mapper, + spec, + rowIngestionMetersFactory + ); + this.isTaskCurrentReturn = isTaskCurrentReturn; + } + @Override + public boolean isTaskCurrent(int taskGroupId, String taskId) + { + return isTaskCurrentReturn; + } + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java new file mode 100644 index 000000000000..3cc124f6afd2 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka.test; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; + +@JsonTypeName("KafkaTuningConfig") +public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuningConfig +{ + private final String extra; + + @JsonCreator + public TestModifiedKafkaIndexTaskTuningConfig( + @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, + @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, + @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. + @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, + @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, + @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, + @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("extra") String extra + ) + { + super( + maxRowsInMemory, + maxBytesInMemory, + maxRowsPerSegment, + maxTotalRows, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + true, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + this.extra = extra; + } + + @JsonProperty("extra") + public String getExtra() + { + return extra; + } +} diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 2983324b0ce4..57ffebcb676e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig; +import org.apache.druid.indexing.kinesis.test.TestModifiedKinesisIndexTaskTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.TuningConfig; @@ -34,6 +35,7 @@ import org.junit.rules.ExpectedException; import java.io.File; +import java.io.IOException; public class KinesisIndexTaskTuningConfigTest { @@ -130,6 +132,121 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertFalse(config.isResetOffsetAutomatically()); } + @Test + public void testSerdeWithModifiedTuningConfigAddedField() throws IOException + { + KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig( + 1, + 3L, + 2, + 100L, + new Period("PT3S"), + new File("/tmp/xxx"), + 4, + new IndexSpec(), + true, + true, + 5L, + true, + false, + 1000, + 1000, + 500, + null, + 42, + null, + false, + 500, + 500, + 6000, + new Period("P3D") + ); + + String serialized = mapper.writeValueAsString(base); + TestModifiedKinesisIndexTaskTuningConfig deserialized = + mapper.readValue(serialized, TestModifiedKinesisIndexTaskTuningConfig.class); + + Assert.assertEquals(null, deserialized.getExtra()); + Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory()); + Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory()); + Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment()); + Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows()); + Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod()); + Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory()); + Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists()); + Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec()); + Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly()); + Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions()); + Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout()); + Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically()); + Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory()); + Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod()); + Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions()); + Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); + Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); + Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait()); + Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); + Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize()); + Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll()); + } + + @Test + public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException + { + KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig( + 1, + 3L, + 2, + 100L, + new Period("PT3S"), + new File("/tmp/xxx"), + 4, + new IndexSpec(), + true, + true, + 5L, + true, + false, + 1000, + 1000, + 500, + null, + 42, + null, + false, + 500, + 500, + 6000, + new Period("P3D") + ); + + String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool")); + KinesisIndexTaskTuningConfig deserialized = + mapper.readValue(serialized, KinesisIndexTaskTuningConfig.class); + + Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory()); + Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory()); + Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment()); + Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows()); + Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod()); + Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory()); + Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists()); + Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec()); + Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly()); + Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions()); + Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout()); + Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically()); + Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory()); + Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod()); + Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions()); + Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); + Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); + Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait()); + Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); + Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize()); + Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll()); + } + @Test public void testResetOffsetAndSkipSequenceNotBothTrue() throws Exception { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 4b138f6eebe2..cc5ae9ee0704 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -45,6 +45,7 @@ import org.apache.druid.indexing.kinesis.KinesisIndexTaskClient; import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory; import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig; +import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig; import org.apache.druid.indexing.kinesis.KinesisRecordSupplier; import org.apache.druid.indexing.kinesis.KinesisSequenceNumber; import org.apache.druid.indexing.overlord.DataSourceMetadata; @@ -57,6 +58,7 @@ import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; @@ -67,6 +69,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.TestHelper; @@ -111,8 +114,6 @@ public class KinesisSupervisorTest extends EasyMockSupport { - - private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); private static final String DATASOURCE = "testDS"; private static final int TEST_CHAT_THREADS = 3; @@ -208,7 +209,7 @@ public void tearDownTest() @Test public void testNoInitialState() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -273,7 +274,7 @@ public void testNoInitialState() throws Exception @Test public void testMultiTask() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -330,7 +331,7 @@ public void testMultiTask() throws Exception @Test public void testReplicas() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -404,7 +405,7 @@ public void testReplicas() throws Exception @Test public void testLateMessageRejectionPeriod() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -453,7 +454,7 @@ public void testLateMessageRejectionPeriod() throws Exception @Test public void testEarlyMessageRejectionPeriod() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H")); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, new Period("PT1H")); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -507,7 +508,7 @@ public void testEarlyMessageRejectionPeriod() throws Exception */ public void testDatasourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -528,7 +529,11 @@ public void testDatasourceMetadata() throws Exception EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2", shardId0, "1"), ImmutableSet.of()) + new SeekableStreamStartSequenceNumbers<>( + stream, + ImmutableMap.of(shardId1, "2", shardId0, "1"), + ImmutableSet.of() + ) ) ).anyTimes(); @@ -555,7 +560,7 @@ public void testDatasourceMetadata() throws Exception @Test(expected = ISE.class) public void testBadMetadataOffsets() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -574,7 +579,11 @@ public void testBadMetadataOffsets() throws Exception EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "101", shardId0, "-1"), ImmutableSet.of()) + new SeekableStreamStartSequenceNumbers<>( + stream, + ImmutableMap.of(shardId1, "101", shardId0, "-1"), + ImmutableSet.of() + ) ) ).anyTimes(); replayAll(); @@ -584,9 +593,9 @@ public void testBadMetadataOffsets() throws Exception } @Test - public void testKillIncompatibleTasks() throws Exception + public void testDontKillTasksWithMismatchedDatasourceAndType() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -600,48 +609,9 @@ public void testKillIncompatibleTasks() throws Exception supervisorRecordSupplier.seek(anyObject(), anyString()); expectLastCall().anyTimes(); - // unexpected # of partitions (kill) + // different datasource (don't kill) Task id1 = createKinesisIndexTask( "id1", - DATASOURCE, - 1, - new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")), - null, - null - ); - - // correct number of partitions and ranges (don't kill) - Task id2 = createKinesisIndexTask( - "id2", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>( - stream, - ImmutableMap.of(shardId0, "0", shardId1, "0"), ImmutableSet.of(shardId0, shardId1) - ), - new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "12")), - null, - null - ); - - // unexpected range on partition 2 (kill) - Task id3 = createKinesisIndexTask( - "id3", - DATASOURCE, - 1, - new SeekableStreamStartSequenceNumbers<>( - stream, - ImmutableMap.of(shardId0, "0", shardId1, "1"), ImmutableSet.of(shardId0, shardId1) - ), - new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "11")), - null, - null - ); - - // different datasource (don't kill) - Task id4 = createKinesisIndexTask( - "id4", "other-datasource", 2, new SeekableStreamStartSequenceNumbers<>( @@ -655,8 +625,8 @@ public void testKillIncompatibleTasks() throws Exception ); // non KinesisIndexTask (don't kill) - Task id5 = new RealtimeIndexTask( - "id5", + Task id2 = new RealtimeIndexTask( + "id2", null, new FireDepartment( dataSchema, @@ -666,18 +636,12 @@ public void testKillIncompatibleTasks() throws Exception null ); - List existingTasks = ImmutableList.of(id1, id2, id3, id4, id5); + List existingTasks = ImmutableList.of(id1, id2); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); - EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); - EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); - EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) .anyTimes(); @@ -689,24 +653,9 @@ public void testKillIncompatibleTasks() throws Exception null ) ).anyTimes(); - EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true)); - EasyMock.expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false)); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); - taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, killing task", "id3"); - - EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true); - - TreeMap> checkpoints = new TreeMap<>(); - checkpoints.put(0, ImmutableMap.of( - shardId0, - "0", - shardId1, - "0" - )); - EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) - .andReturn(Futures.immediateFuture(checkpoints)) - .times(2); + EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2); replayAll(); supervisor.start(); @@ -717,7 +666,14 @@ public void testKillIncompatibleTasks() throws Exception @Test public void testKillBadPartitionAssignment() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor( + 1, + 2, + true, + "PT1H", + null, + null + ); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -770,17 +726,8 @@ public void testKillBadPartitionAssignment() throws Exception null, null ); - Task id5 = createKinesisIndexTask( - "id5", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of(shardId0)), - new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")), - null, - null - ); - List existingTasks = ImmutableList.of(id1, id2, id3, id4, id5); + List existingTasks = ImmutableList.of(id1, id2, id3, id4); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); @@ -790,12 +737,10 @@ public void testKillBadPartitionAssignment() throws Exception EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id4")).andReturn(Optional.of(TaskStatus.running("id4"))).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id5")).andReturn(Optional.of(TaskStatus.running("id5"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); EasyMock.expect(taskStorage.getTask("id4")).andReturn(Optional.of(id4)).anyTimes(); - EasyMock.expect(taskStorage.getTask("id5")).andReturn(Optional.of(id5)).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) .anyTimes(); @@ -808,24 +753,26 @@ public void testKillBadPartitionAssignment() throws Exception ) ).anyTimes(); EasyMock.expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(true)); - EasyMock.expect(taskClient.stopAsync("id4", false)).andReturn(Futures.immediateFuture(false)); - EasyMock.expect(taskClient.stopAsync("id5", false)).andReturn(Futures.immediateFuture((Boolean) null)); TreeMap> checkpoints1 = new TreeMap<>(); checkpoints1.put(0, ImmutableMap.of(shardId1, "0")); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(shardId0, "0")); + TreeMap> checkpoints4 = new TreeMap<>(); + checkpoints4.put(0, ImmutableMap.of(shardId0, "0")); EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean())) .andReturn(Futures.immediateFuture(checkpoints1)) .times(1); EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) .andReturn(Futures.immediateFuture(checkpoints2)) .times(1); + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id4"), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints4)) + .times(1); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); - taskQueue.shutdown("id4", "Task [%s] failed to stop in a timely manner, killing task", "id4"); - taskQueue.shutdown("id5", "Task [%s] failed to stop in a timely manner, killing task", "id5"); + taskQueue.shutdown("id4", "Task [%s] failed to return status, killing task", "id4"); replayAll(); supervisor.start(); @@ -836,7 +783,7 @@ public void testKillBadPartitionAssignment() throws Exception @Test public void testRequeueTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); @@ -945,7 +892,7 @@ public void testRequeueTaskWhenFailed() throws Exception @Test public void testRequeueAdoptedTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); @@ -964,7 +911,11 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "stream", + ImmutableMap.of(shardId1, "0", shardId0, "0"), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "stream", ImmutableMap.of( @@ -1075,7 +1026,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception @Test public void testQueueNextTasksOnSuccess() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); @@ -1196,7 +1147,7 @@ public void testBeginPublishAndQueueNextTasks() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -1332,7 +1283,7 @@ public void testDiscoverExistingPublishingTask() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -1351,7 +1302,11 @@ public void testDiscoverExistingPublishingTask() throws Exception "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "stream", + ImmutableMap.of(shardId1, "0", shardId0, "0"), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "stream", ImmutableMap.of( @@ -1481,7 +1436,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); @@ -1499,7 +1454,11 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "stream", + ImmutableMap.of(shardId1, "0", shardId0, "0"), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "stream", ImmutableMap.of( @@ -1620,7 +1579,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -1790,7 +1749,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception @Test public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); @@ -1869,7 +1828,7 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); @@ -1974,7 +1933,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null); + supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); @@ -2090,7 +2049,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception @Test(expected = IllegalStateException.class) public void testStopNotStarted() { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisor.stop(false); } @@ -2105,7 +2064,7 @@ public void testStop() taskRunner.unregisterListener(StringUtils.format("KinesisSupervisor-%s", DATASOURCE)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisor.start(); supervisor.stop(false); @@ -2119,7 +2078,7 @@ public void testStopGracefully() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); @@ -2137,7 +2096,11 @@ public void testStopGracefully() throws Exception "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "stream", + ImmutableMap.of(shardId1, "0", shardId0, "0"), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "stream", ImmutableMap.of( @@ -2295,7 +2258,7 @@ public void testResetNoTasks() throws Exception taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisor.start(); @@ -2315,7 +2278,7 @@ public void testResetNoTasks() throws Exception public void testResetDataSourceMetadata() throws Exception { expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -2392,7 +2355,7 @@ public void testResetDataSourceMetadata() throws Exception public void testResetNoDataSourceMetadata() throws Exception { expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); @@ -2428,7 +2391,7 @@ public void testResetRunningTasks() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); @@ -2446,7 +2409,11 @@ public void testResetRunningTasks() throws Exception "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "stream", + ImmutableMap.of(shardId1, "0", shardId0, "0"), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "stream", ImmutableMap.of( @@ -2579,7 +2546,7 @@ public void testResetRunningTasks() throws Exception public void testNoDataIngestionTasks() throws Exception { final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); //not adding any events Task id1 = createKinesisIndexTask( @@ -2724,7 +2691,7 @@ public void testNoDataIngestionTasks() throws Exception public void testCheckpointForInactiveTaskGroup() throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException { - supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false); //not adding any events final Task id1; id1 = createKinesisIndexTask( @@ -2862,7 +2829,11 @@ public void testCheckpointForInactiveTaskGroup() new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), checkpoints.get(0).keySet()) ), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, fakeCheckpoints, ImmutableSet.of())) + new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + stream, + fakeCheckpoints, + ImmutableSet.of() + )) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -2880,7 +2851,7 @@ public void testCheckpointForInactiveTaskGroup() public void testCheckpointForUnknownTaskGroup() throws InterruptedException { - supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false); supervisorRecordSupplier.assign(anyObject()); @@ -2984,8 +2955,16 @@ public void testCheckpointForUnknownTaskGroup() supervisor.checkpoint( 0, ((KinesisIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of())), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of())) + new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + stream, + Collections.emptyMap(), + ImmutableSet.of() + )), + new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + stream, + Collections.emptyMap(), + ImmutableSet.of() + )) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -3011,7 +2990,7 @@ public void testCheckpointForUnknownTaskGroup() public void testCheckpointWithNullTaskGroupId() throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException { - supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false); + supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null, false); //not adding any events final Task id1 = createKinesisIndexTask( "id1", @@ -3100,8 +3079,16 @@ public void testCheckpointWithNullTaskGroupId() supervisor.checkpoint( null, ((KinesisIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), ImmutableSet.of())), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, newCheckpoints.get(0), ImmutableSet.of())) + new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + stream, + checkpoints.get(0), + ImmutableSet.of() + )), + new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( + stream, + newCheckpoints.get(0), + ImmutableSet.of() + )) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -3115,7 +3102,7 @@ public void testCheckpointWithNullTaskGroupId() @Test public void testSuspendedNoRunningTasks() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true); expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3148,7 +3135,7 @@ public void testSuspendedRunningTasks() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, true); + supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true); supervisorRecordSupplier.assign(anyObject()); expectLastCall().anyTimes(); @@ -3168,7 +3155,11 @@ public void testSuspendedRunningTasks() throws Exception "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "stream", + ImmutableMap.of(shardId1, "0", shardId0, "0"), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "stream", ImmutableMap.of( @@ -3313,7 +3304,7 @@ public void testResetSuspended() throws Exception taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true); + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true); supervisor.start(); supervisor.runInternal(); verifyAll(); @@ -3330,7 +3321,7 @@ public void testResetSuspended() throws Exception @Test public void testGetCurrentTotalStats() { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false); + supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false); supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition("0"), ImmutableMap.of("0", "0"), @@ -3371,81 +3362,393 @@ public void testGetCurrentTotalStats() Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1")); } - private KinesisSupervisor getSupervisor( - int replicas, - int taskCount, - boolean useEarliestOffset, - String duration, - Period lateMessageRejectionPeriod, - Period earlyMessageRejectionPeriod - ) + @Test + public void testDoNotKillCompatibleTasks() + throws InterruptedException, EntryExistsException, ExecutionException, TimeoutException, JsonProcessingException { - return getSupervisor( - replicas, - taskCount, - useEarliestOffset, - duration, - lateMessageRejectionPeriod, - earlyMessageRejectionPeriod, + // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks + int numReplicas = 2; + supervisor = getTestableSupervisorCustomIsTaskCurrent( + numReplicas, + 1, + true, + "PT1H", + new Period("P1D"), + new Period("P1D"), false, + 42, + 1000, + true + ); + + supervisorRecordSupplier.assign(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionIds(stream)) + .andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getAssignment()) + .andReturn(ImmutableSet.of(shard1Partition, shard0Partition)) + .anyTimes(); + supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); + supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); + EasyMock.expectLastCall().anyTimes(); + + Task task = createKinesisIndexTask( + "id2", + DATASOURCE, + 0, + new SeekableStreamStartSequenceNumbers<>( + stream, + ImmutableMap.of( + shardId0, + "0", + shardId1, + "0" + ), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of( + shardId0, + "1", + shardId1, + "12" + )), null, null ); - } - private KinesisSupervisor getSupervisor( - int replicas, - int taskCount, - boolean useEarliestOffset, - String duration, - Period lateMessageRejectionPeriod, - Period earlyMessageRejectionPeriod, - boolean suspended, - Integer recordsPerFetch, - Integer fetchDelayMillis + List existingTasks = ImmutableList.of(task); - ) + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(task)).anyTimes(); + EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) + .anyTimes(); + EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) + .anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KinesisDataSourceMetadata( + null + ) + ).anyTimes(); + + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true); + + TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of( + shardId0, + "0", + shardId1, + "0" + )); + + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(numReplicas); + + replayAll(); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testKillIncompatibleTasks() + throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException, EntryExistsException { - KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( - stream, - "awsEndpoint", - null, - replicas, - taskCount, - new Period(duration), + // This supervisor always returns false for isTaskCurrent -> it should kill its tasks + int numReplicas = 2; + supervisor = getTestableSupervisorCustomIsTaskCurrent( + numReplicas, + 1, + true, + "PT1H", new Period("P1D"), - new Period("PT30S"), - useEarliestOffset, - new Period("PT30M"), - lateMessageRejectionPeriod, - earlyMessageRejectionPeriod, - recordsPerFetch, - fetchDelayMillis, - null, - null, + new Period("P1D"), + false, + 42, + 1000, false ); + supervisorRecordSupplier.assign(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionIds(stream)) + .andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getAssignment()) + .andReturn(ImmutableSet.of(shard1Partition, shard0Partition)) + .anyTimes(); + supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); + supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); + EasyMock.expectLastCall().anyTimes(); - KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( + Task task = createKinesisIndexTask( + "id1", + DATASOURCE, + 0, + new SeekableStreamStartSequenceNumbers<>( + stream, + ImmutableMap.of( + shardId0, + "0", + shardId1, + "0" + ), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of( + shardId0, + "1", + shardId1, + "12" + )), null, null - ) - { - @Override - public KinesisIndexTaskClient build( - TaskInfoProvider taskInfoProvider, - String dataSource, - int numThreads, - Duration httpTimeout, - long numRetries - ) - { - Assert.assertEquals(TEST_CHAT_THREADS, numThreads); - Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); - Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); - return taskClient; - } - }; + ); + + List existingTasks = ImmutableList.of(task); + + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); + EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) + .anyTimes(); + EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) + .anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KinesisDataSourceMetadata( + null + ) + ).anyTimes(); + EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true)); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2); + + replayAll(); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testIsTaskCurrent() + { + DateTime minMessageTime = DateTimes.nowUtc(); + DateTime maxMessageTime = DateTimes.nowUtc().plus(10000); + + KinesisSupervisor supervisor = getSupervisor( + 1, + 1, + true, + "PT1H", + new Period("P1D"), + new Period("P1D"), + false, + 42, + 42, + dataSchema, + tuningConfig + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + 42, + ImmutableMap.of(shardId1, "3"), + Optional.of(minMessageTime), + Optional.of(maxMessageTime), + ImmutableSet.of("id1", "id2", "id3", "id4"), + ImmutableSet.of() + ); + + DataSchema modifiedDataSchema = getDataSchema("some other datasource"); + + KinesisSupervisorTuningConfig modifiedTuningConfig = new KinesisSupervisorTuningConfig( + 1000, + null, + 50000, + null, + new Period("P1Y"), + new File("/test"), + null, + null, + true, + false, + null, + null, + null, + null, + numThreads, + TEST_CHAT_THREADS, + TEST_CHAT_RETRIES, + TEST_HTTP_TIMEOUT, + TEST_SHUTDOWN_TIMEOUT, + null, + null, + null, + 5000, + null, + null, + null, + null, + 42, // This property is different from tuningConfig + null + ); + + KinesisIndexTask taskFromStorage = createKinesisIndexTask( + "id1", + 0, + new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of( + shardId1, + "3" + ), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of( + shardId1, + KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER + )), + minMessageTime, + maxMessageTime, + dataSchema + ); + + KinesisIndexTask taskFromStorageMismatchedDataSchema = createKinesisIndexTask( + "id2", + 0, + new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of( + shardId1, + "3" + ), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of( + shardId1, + KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER + )), + minMessageTime, + maxMessageTime, + modifiedDataSchema + ); + + KinesisIndexTask taskFromStorageMismatchedTuningConfig = createKinesisIndexTask( + "id3", + 0, + new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of( + shardId1, + "3" + ), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of( + shardId1, + KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER + )), + minMessageTime, + maxMessageTime, + dataSchema, + modifiedTuningConfig + ); + + KinesisIndexTask taskFromStorageMismatchedPartitionsWithTaskGroup = createKinesisIndexTask( + "id4", + 0, + new SeekableStreamStartSequenceNumbers<>( + "stream", + ImmutableMap.of( + shardId1, + "4" // this is the mismatch + ), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of( + shardId1, + KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER + )), + minMessageTime, + maxMessageTime, + dataSchema + ); + + EasyMock.expect(taskStorage.getTask("id1")) + .andReturn(Optional.of(taskFromStorage)) + .once(); + EasyMock.expect(taskStorage.getTask("id2")) + .andReturn(Optional.of(taskFromStorageMismatchedDataSchema)) + .once(); + EasyMock.expect(taskStorage.getTask("id3")) + .andReturn(Optional.of(taskFromStorageMismatchedTuningConfig)) + .once(); + EasyMock.expect(taskStorage.getTask("id4")) + .andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup)) + .once(); + + replayAll(); + + Assert.assertTrue(supervisor.isTaskCurrent(42, "id1")); + Assert.assertFalse(supervisor.isTaskCurrent(42, "id2")); + Assert.assertFalse(supervisor.isTaskCurrent(42, "id3")); + Assert.assertFalse(supervisor.isTaskCurrent(42, "id4")); + verifyAll(); + } + + private KinesisSupervisor getTestableSupervisor( + int replicas, + int taskCount, + boolean useEarliestOffset, + String duration, + Period lateMessageRejectionPeriod, + Period earlyMessageRejectionPeriod, + boolean suspended + ) + { + KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( + stream, + "awsEndpoint", + null, + replicas, + taskCount, + new Period(duration), + new Period("P1D"), + new Period("PT30S"), + useEarliestOffset, + new Period("PT30M"), + lateMessageRejectionPeriod, + earlyMessageRejectionPeriod, + null, + null, + null, + null, + false + ); + + KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( + null, + null + ) + { + @Override + public KinesisIndexTaskClient build( + TaskInfoProvider taskInfoProvider, + String dataSource, + int numThreads, + Duration httpTimeout, + long numRetries + ) + { + Assert.assertEquals(TEST_CHAT_THREADS, numThreads); + Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); + Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); + return taskClient; + } + }; return new TestableKinesisSupervisor( taskStorage, @@ -3473,105 +3776,194 @@ public KinesisIndexTaskClient build( ); } - private static DataSchema getDataSchema(String dataSource) + private KinesisSupervisor getTestableSupervisor( + int replicas, + int taskCount, + boolean useEarliestOffset, + String duration, + Period lateMessageRejectionPeriod, + Period earlyMessageRejectionPeriod + ) { - List dimensions = new ArrayList<>(); - dimensions.add(StringDimensionSchema.create("dim1")); - dimensions.add(StringDimensionSchema.create("dim2")); - - return new DataSchema( - dataSource, - objectMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("timestamp", "iso", null), - new DimensionsSpec( - dimensions, - null, - null - ), - new JSONPathSpec(true, ImmutableList.of()), - ImmutableMap.of() - ), - StandardCharsets.UTF_8.name() - ), - Map.class - ), - new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec( - Granularities.HOUR, - Granularities.NONE, - ImmutableList.of() - ), + return getTestableSupervisor( + replicas, + taskCount, + useEarliestOffset, + duration, + lateMessageRejectionPeriod, + earlyMessageRejectionPeriod, + false, null, - objectMapper + null ); } - private static List jb( - String timestamp, - String dim1, - String dim2, - String dimLong, - String dimFloat, - String met1 + private KinesisSupervisor getTestableSupervisor( + int replicas, + int taskCount, + boolean useEarliestOffset, + String duration, + Period lateMessageRejectionPeriod, + Period earlyMessageRejectionPeriod, + boolean suspended, + Integer recordsPerFetch, + Integer fetchDelayMillis ) { - try { - return Collections.singletonList(new ObjectMapper().writeValueAsBytes( - ImmutableMap.builder() - .put("timestamp", timestamp) - .put("dim1", dim1) - .put("dim2", dim2) - .put("dimLong", dimLong) - .put("dimFloat", dimFloat) - .put("met1", met1) - .build() - )); - } - catch (Exception e) { - throw new RuntimeException(e); - } + KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( + stream, + "awsEndpoint", + null, + replicas, + taskCount, + new Period(duration), + new Period("P1D"), + new Period("PT30S"), + useEarliestOffset, + new Period("PT30M"), + lateMessageRejectionPeriod, + earlyMessageRejectionPeriod, + recordsPerFetch, + fetchDelayMillis, + null, + null, + false + ); + + KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( + null, + null + ) + { + @Override + public KinesisIndexTaskClient build( + TaskInfoProvider taskInfoProvider, + String dataSource, + int numThreads, + Duration httpTimeout, + long numRetries + ) + { + Assert.assertEquals(TEST_CHAT_THREADS, numThreads); + Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); + Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); + return taskClient; + } + }; + + return new TestableKinesisSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + objectMapper, + new KinesisSupervisorSpec( + dataSchema, + tuningConfig, + KinesisSupervisorIOConfig, + null, + suspended, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + objectMapper, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + null + ), + rowIngestionMetersFactory + ); } - private KinesisIndexTask createKinesisIndexTask( - String id, - String dataSource, - int taskGroupId, - SeekableStreamStartSequenceNumbers startPartitions, - SeekableStreamEndSequenceNumbers endPartitions, - DateTime minimumMessageTime, - DateTime maximumMessageTime + /** + * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent() + */ + private KinesisSupervisor getTestableSupervisorCustomIsTaskCurrent( + int replicas, + int taskCount, + boolean useEarliestOffset, + String duration, + Period lateMessageRejectionPeriod, + Period earlyMessageRejectionPeriod, + boolean suspended, + Integer recordsPerFetch, + Integer fetchDelayMillis, + boolean isTaskCurrentReturn ) { - return new KinesisIndexTask( - id, + KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( + stream, + "awsEndpoint", null, - getDataSchema(dataSource), - tuningConfig, - new KinesisIndexTaskIOConfig( - 0, - "sequenceName-" + taskGroupId, - startPartitions, - endPartitions, - true, - minimumMessageTime, - maximumMessageTime, - "awsEndpoint", - null, - null, - null, - null, - false - ), - Collections.emptyMap(), + replicas, + taskCount, + new Period(duration), + new Period("P1D"), + new Period("PT30S"), + useEarliestOffset, + new Period("PT30M"), + lateMessageRejectionPeriod, + earlyMessageRejectionPeriod, + recordsPerFetch, + fetchDelayMillis, null, null, - rowIngestionMetersFactory, + false + ); + + KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( + null, null + ) + { + @Override + public KinesisIndexTaskClient build( + TaskInfoProvider taskInfoProvider, + String dataSource, + int numThreads, + Duration httpTimeout, + long numRetries + ) + { + Assert.assertEquals(TEST_CHAT_THREADS, numThreads); + Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); + Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); + return taskClient; + } + }; + + return new TestableKinesisSupervisorWithCustomIsTaskCurrent( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + objectMapper, + new KinesisSupervisorSpec( + dataSchema, + tuningConfig, + KinesisSupervisorIOConfig, + null, + suspended, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + objectMapper, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + null + ), + rowIngestionMetersFactory, + isTaskCurrentReturn ); } + /** + * Use for tests where you don't want generateSequenceName to be overridden out + */ private KinesisSupervisor getSupervisor( int replicas, int taskCount, @@ -3579,7 +3971,11 @@ private KinesisSupervisor getSupervisor( String duration, Period lateMessageRejectionPeriod, Period earlyMessageRejectionPeriod, - boolean suspended + boolean suspended, + Integer recordsPerFetch, + Integer fetchDelayMillis, + DataSchema dataSchema, + KinesisSupervisorTuningConfig tuningConfig ) { KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( @@ -3595,8 +3991,8 @@ private KinesisSupervisor getSupervisor( new Period("PT30M"), lateMessageRejectionPeriod, earlyMessageRejectionPeriod, - null, - null, + recordsPerFetch, + fetchDelayMillis, null, null, false @@ -3623,7 +4019,7 @@ public KinesisIndexTaskClient build( } }; - return new TestableKinesisSupervisor( + return new KinesisSupervisor( taskStorage, taskMaster, indexerMetadataStorageCoordinator, @@ -3645,7 +4041,126 @@ public KinesisIndexTaskClient build( rowIngestionMetersFactory, null ), - rowIngestionMetersFactory + rowIngestionMetersFactory, + null + ); + } + + private static DataSchema getDataSchema(String dataSource) + { + List dimensions = new ArrayList<>(); + dimensions.add(StringDimensionSchema.create("dim1")); + dimensions.add(StringDimensionSchema.create("dim2")); + + return new DataSchema( + dataSource, + objectMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + dimensions, + null, + null + ), + new JSONPathSpec(true, ImmutableList.of()), + ImmutableMap.of() + ), + StandardCharsets.UTF_8.name() + ), + Map.class + ), + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.NONE, + ImmutableList.of() + ), + null, + objectMapper + ); + } + + + private KinesisIndexTask createKinesisIndexTask( + String id, + String dataSource, + int taskGroupId, + SeekableStreamStartSequenceNumbers startPartitions, + SeekableStreamEndSequenceNumbers endPartitions, + DateTime minimumMessageTime, + DateTime maximumMessageTime + ) + { + return createKinesisIndexTask( + id, + taskGroupId, + startPartitions, + endPartitions, + minimumMessageTime, + maximumMessageTime, + getDataSchema(dataSource) + ); + } + + private KinesisIndexTask createKinesisIndexTask( + String id, + int taskGroupId, + SeekableStreamStartSequenceNumbers startPartitions, + SeekableStreamEndSequenceNumbers endPartitions, + DateTime minimumMessageTime, + DateTime maximumMessageTime, + DataSchema dataSchema + ) + { + return createKinesisIndexTask( + id, + taskGroupId, + startPartitions, + endPartitions, + minimumMessageTime, + maximumMessageTime, + dataSchema, + (KinesisIndexTaskTuningConfig) tuningConfig.convertToTaskTuningConfig() + ); + } + + private KinesisIndexTask createKinesisIndexTask( + String id, + int taskGroupId, + SeekableStreamStartSequenceNumbers startPartitions, + SeekableStreamEndSequenceNumbers endPartitions, + DateTime minimumMessageTime, + DateTime maximumMessageTime, + DataSchema dataSchema, + KinesisIndexTaskTuningConfig tuningConfig + ) + { + return new KinesisIndexTask( + id, + null, + dataSchema, + tuningConfig, + new KinesisIndexTaskIOConfig( + 0, + "sequenceName-" + taskGroupId, + startPartitions, + endPartitions, + true, + minimumMessageTime, + maximumMessageTime, + "awsEndpoint", + null, + null, + null, + null, + false + ), + Collections.emptyMap(), + null, + null, + rowIngestionMetersFactory, + null ); } @@ -3714,7 +4229,9 @@ public TestableKinesisSupervisor( protected String generateSequenceName( Map startPartitions, Optional minimumMessageTime, - Optional maximumMessageTime + Optional maximumMessageTime, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig ) { final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()); @@ -3726,7 +4243,39 @@ protected RecordSupplier setupRecordSupplier() { return supervisorRecordSupplier; } + } + + private class TestableKinesisSupervisorWithCustomIsTaskCurrent extends TestableKinesisSupervisor + { + private boolean isTaskCurrentReturn; + public TestableKinesisSupervisorWithCustomIsTaskCurrent( + TaskStorage taskStorage, + TaskMaster taskMaster, + IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + KinesisIndexTaskClientFactory taskClientFactory, + ObjectMapper mapper, + KinesisSupervisorSpec spec, + RowIngestionMetersFactory rowIngestionMetersFactory, + boolean isTaskCurrentReturn + ) + { + super( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + mapper, + spec, + rowIngestionMetersFactory + ); + this.isTaskCurrentReturn = isTaskCurrentReturn; + } + @Override + public boolean isTaskCurrent(int taskGroupId, String taskId) + { + return isTaskCurrentReturn; + } } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java new file mode 100644 index 000000000000..2485e977d60f --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kinesis.test; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; + +@JsonTypeName("KinesisTuningConfig") +public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTuningConfig +{ + private final String extra; + + @JsonCreator + public TestModifiedKinesisIndexTaskTuningConfig( + @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, + @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") Long maxTotalRows, + @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") File basePersistDirectory, + @JsonProperty("maxPendingPersists") Integer maxPendingPersists, + @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("buildV9Directly") Boolean buildV9Directly, + @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, + @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, + @JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean skipSequenceNumberAvailabilityCheck, + @JsonProperty("recordBufferSize") Integer recordBufferSize, + @JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout, + @JsonProperty("recordBufferFullWait") Integer recordBufferFullWait, + @JsonProperty("fetchSequenceNumberTimeout") Integer fetchSequenceNumberTimeout, + @JsonProperty("fetchThreads") Integer fetchThreads, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, + @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("extra") String extra + ) + { + super( + maxRowsInMemory, + maxBytesInMemory, + maxRowsPerSegment, + maxTotalRows, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + buildV9Directly, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + skipSequenceNumberAvailabilityCheck, + recordBufferSize, + recordBufferOfferTimeout, + recordBufferFullWait, + fetchSequenceNumberTimeout, + fetchThreads, + segmentWriteOutMediumFactory, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions, + maxRecordsPerPoll, + intermediateHandoffPeriod + ); + this.extra = extra; + } + + public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig base, String extra) + { + super( + base.getMaxRowsInMemory(), + base.getMaxBytesInMemory(), + base.getMaxRowsPerSegment(), + base.getMaxTotalRows(), + base.getIntermediatePersistPeriod(), + base.getBasePersistDirectory(), + base.getMaxPendingPersists(), + base.getIndexSpec(), + base.getBuildV9Directly(), + base.isReportParseExceptions(), + base.getHandoffConditionTimeout(), + base.isResetOffsetAutomatically(), + base.isSkipSequenceNumberAvailabilityCheck(), + base.getRecordBufferSize(), + base.getRecordBufferOfferTimeout(), + base.getRecordBufferFullWait(), + base.getFetchSequenceNumberTimeout(), + base.getFetchThreads(), + base.getSegmentWriteOutMediumFactory(), + base.isLogParseExceptions(), + base.getMaxParseExceptions(), + base.getMaxSavedParseExceptions(), + base.getMaxRecordsPerPoll(), + base.getIntermediateHandoffPeriod() + ); + this.extra = extra; + } + + @JsonProperty("extra") + public String getExtra() + { + return extra; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 265b3a329360..032ab7c5ccff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -73,6 +73,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.EntryExistsException; +import org.apache.druid.segment.indexing.DataSchema; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -166,6 +167,31 @@ private class TaskGroup Optional maximumMessageTime, @Nullable Set exclusiveStartSequenceNumberPartitions ) + { + this( + groupId, + startingSequences, + minimumMessageTime, + maximumMessageTime, + exclusiveStartSequenceNumberPartitions, + generateSequenceName( + startingSequences, + minimumMessageTime, + maximumMessageTime, + spec.getDataSchema(), + taskTuningConfig + ) + ); + } + + TaskGroup( + int groupId, + ImmutableMap startingSequences, + Optional minimumMessageTime, + Optional maximumMessageTime, + Set exclusiveStartSequenceNumberPartitions, + String baseSequenceName + ) { this.groupId = groupId; this.startingSequences = startingSequences; @@ -175,7 +201,7 @@ private class TaskGroup this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions != null ? exclusiveStartSequenceNumberPartitions : Collections.emptySet(); - this.baseSequenceName = generateSequenceName(startingSequences, minimumMessageTime, maximumMessageTime); + this.baseSequenceName = baseSequenceName; } int addNewCheckpoint(Map checkpoint) @@ -1153,12 +1179,18 @@ public void resetInternal(DataSourceMetadata dataSourceMetadata) } } if (metadataUpdateSuccess) { - resetMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().keySet().forEach(partition -> { - final int groupId = getTaskGroupIdForPartition(partition); - killTaskGroupForPartitions(ImmutableSet.of(partition), "DataSourceMetadata is updated while reset"); - activelyReadingTaskGroups.remove(groupId); - partitionGroups.get(groupId).replaceAll((partitionId, sequence) -> getNotSetMarker()); - }); + resetMetadata.getSeekableStreamSequenceNumbers() + .getPartitionSequenceNumberMap() + .keySet() + .forEach(partition -> { + final int groupId = getTaskGroupIdForPartition(partition); + killTaskGroupForPartitions( + ImmutableSet.of(partition), + "DataSourceMetadata is updated while reset" + ); + activelyReadingTaskGroups.remove(groupId); + partitionGroups.get(groupId).replaceAll((partitionId, sequence) -> getNotSetMarker()); + }); } else { throw new ISE("Unable to reset metadata"); } @@ -1256,7 +1288,8 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti futureTaskIds.add(taskId); futures.add( Futures.transform( - taskClient.getStatusAsync(taskId), new Function() + taskClient.getStatusAsync(taskId), + new Function() { @Override public Boolean apply(SeekableStreamIndexTaskRunner.Status status) @@ -1270,7 +1303,8 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) .keySet() .forEach( partition -> addDiscoveredTaskToPendingCompletionTaskGroups( - getTaskGroupIdForPartition(partition), + getTaskGroupIdForPartition( + partition), taskId, seekableStreamIndexTask.getIOConfig() .getStartSequenceNumbers() @@ -1294,7 +1328,9 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) succeeded = true; SequenceOffsetType previousOffset = partitionOffsets.putIfAbsent(partition, sequence); if (previousOffset != null - && (makeSequenceNumber(previousOffset).compareTo(makeSequenceNumber(sequence))) < 0) { + && (makeSequenceNumber(previousOffset) + .compareTo(makeSequenceNumber( + sequence))) < 0) { succeeded = partitionOffsets.replace(partition, previousOffset, sequence); } } while (!succeeded); @@ -1310,7 +1346,8 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) taskId ); try { - stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS); + stopTask(taskId, false) + .get(futureTimeoutInSeconds, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.warn(e, "Exception while stopping task"); @@ -1326,7 +1363,8 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) taskId ); try { - stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS); + stopTask(taskId, false) + .get(futureTimeoutInSeconds, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.warn(e, "Exception while stopping task"); @@ -1337,6 +1375,8 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) taskGroupId, k -> { log.info("Creating a new task group for taskGroupId[%d]", taskGroupId); + // We reassign the task's original base sequence name (from the existing task) to the + // task group so that the replica segment allocations are the same. return new TaskGroup( taskGroupId, ImmutableMap.copyOf( @@ -1348,7 +1388,8 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) seekableStreamIndexTask.getIOConfig().getMaximumMessageTime(), seekableStreamIndexTask.getIOConfig() .getStartSequenceNumbers() - .getExclusivePartitions() + .getExclusivePartitions(), + seekableStreamIndexTask.getIOConfig().getBaseSequenceName() ); } ); @@ -1361,6 +1402,7 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) taskId ); } + verifySameSequenceNameForAllTasksInGroup(taskGroupId); } } return true; @@ -1370,7 +1412,6 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) return null; } } - }, workerExec ) ); @@ -1378,7 +1419,6 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) } } - List results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS); for (int i = 0; i < results.size(); i++) { if (results.get(i) == null) { @@ -1423,10 +1463,11 @@ private void verifyAndMergeCheckpoints(final TaskGroup taskGroup) final List taskIds = new ArrayList<>(); for (String taskId : taskGroup.taskIds()) { - final ListenableFuture>> checkpointsFuture = taskClient.getCheckpointsAsync( - taskId, - true - ); + final ListenableFuture>> checkpointsFuture = + taskClient.getCheckpointsAsync( + taskId, + true + ); futures.add(checkpointsFuture); taskIds.add(taskId); } @@ -1611,6 +1652,34 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups( taskGroupList.add(newTaskGroup); } + // Sanity check to ensure that tasks have the same sequence name as their task group + private void verifySameSequenceNameForAllTasksInGroup(int groupId) + { + String taskGroupSequenceName = activelyReadingTaskGroups.get(groupId).baseSequenceName; + boolean allSequenceNamesMatch = + activelyReadingTaskGroups.get(groupId) + .tasks + .keySet() + .stream() + .map(x -> { + Optional taskOptional = taskStorage.getTask(x); + if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) { + return false; + } + @SuppressWarnings("unchecked") + SeekableStreamIndexTask task = + (SeekableStreamIndexTask) taskOptional.get(); + return task.getIOConfig().getBaseSequenceName(); + }) + .allMatch(taskSeqName -> taskSeqName.equals(taskGroupSequenceName)); + if (!allSequenceNamesMatch) { + throw new ISE( + "Base sequence names do not match for the tasks in the task group with ID [%s]", + groupId + ); + } + } + private ListenableFuture stopTask(final String id, final boolean publish) { return Futures.transform( @@ -1630,7 +1699,8 @@ public Void apply(@Nullable Boolean result) ); } - private boolean isTaskCurrent(int taskGroupId, String taskId) + @VisibleForTesting + public boolean isTaskCurrent(int taskGroupId, String taskId) { Optional taskOptional = taskStorage.getTask(taskId); if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) { @@ -1638,22 +1708,39 @@ private boolean isTaskCurrent(int taskGroupId, String taskId) } @SuppressWarnings("unchecked") - SeekableStreamIndexTask task = (SeekableStreamIndexTask) taskOptional - .get(); + SeekableStreamIndexTask task = + (SeekableStreamIndexTask) taskOptional.get(); + + // We recompute the sequence name hash for the supervisor's own configuration and compare this to the hash created + // by rehashing the task's sequence name using the most up-to-date class definitions of tuning config and + // data schema. Recomputing both ensures that forwards-compatible tasks won't be killed (which would occur + // if the hash generated using the old class definitions was used). + String taskSequenceName = generateSequenceName( + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(), + task.getIOConfig().getMinimumMessageTime(), + task.getIOConfig().getMaximumMessageTime(), + task.getDataSchema(), + task.getTuningConfig() + ); - String taskSequenceName = task.getIOConfig().getBaseSequenceName(); if (activelyReadingTaskGroups.get(taskGroupId) != null) { - return Preconditions - .checkNotNull(activelyReadingTaskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId) - .baseSequenceName - .equals(taskSequenceName); + TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId); + return generateSequenceName( + taskGroup.startingSequences, + taskGroup.minimumMessageTime, + taskGroup.maximumMessageTime, + spec.getDataSchema(), + taskTuningConfig + ).equals(taskSequenceName); } else { return generateSequenceName( task.getIOConfig() .getStartSequenceNumbers() .getPartitionSequenceNumberMap(), task.getIOConfig().getMinimumMessageTime(), - task.getIOConfig().getMaximumMessageTime() + task.getIOConfig().getMaximumMessageTime(), + spec.getDataSchema(), + taskTuningConfig ).equals(taskSequenceName); } } @@ -1662,7 +1749,9 @@ private boolean isTaskCurrent(int taskGroupId, String taskId) protected String generateSequenceName( Map startPartitions, Optional minimumMessageTime, - Optional maximumMessageTime + Optional maximumMessageTime, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig ) { StringBuilder sb = new StringBuilder(); @@ -1675,17 +1764,17 @@ protected String generateSequenceName( String minMsgTimeStr = (minimumMessageTime.isPresent() ? String.valueOf(minimumMessageTime.get().getMillis()) : ""); String maxMsgTimeStr = (maximumMessageTime.isPresent() ? String.valueOf(maximumMessageTime.get().getMillis()) : ""); - String dataSchema, tuningConfig; + String dataSchemaStr, tuningConfigStr; try { - dataSchema = sortingMapper.writeValueAsString(spec.getDataSchema()); - tuningConfig = sortingMapper.writeValueAsString(taskTuningConfig); + dataSchemaStr = sortingMapper.writeValueAsString(dataSchema); + tuningConfigStr = sortingMapper.writeValueAsString(tuningConfig); } catch (JsonProcessingException e) { throw new RuntimeException(e); } - String hashCode = DigestUtils.sha1Hex(dataSchema - + tuningConfig + String hashCode = DigestUtils.sha1Hex(dataSchemaStr + + tuningConfigStr + partitionOffsetStr + minMsgTimeStr + maxMsgTimeStr) @@ -2282,7 +2371,6 @@ private void createNewTasks() exclusiveStartSequenceNumberPartitions ) ); - } } @@ -2474,8 +2562,8 @@ private void createTasksForGroup(int groupId, int replicas) .get(groupId) .exclusiveStartSequenceNumberPartitions; - DateTime minimumMessageTime = activelyReadingTaskGroups.get(groupId).minimumMessageTime.orNull(); - DateTime maximumMessageTime = activelyReadingTaskGroups.get(groupId).maximumMessageTime.orNull(); + DateTime minimumMessageTime = group.minimumMessageTime.orNull(); + DateTime maximumMessageTime = group.maximumMessageTime.orNull(); SeekableStreamIndexTaskIOConfig newIoConfig = createTaskIoConfig( groupId, @@ -2488,7 +2576,6 @@ private void createTasksForGroup(int groupId, int replicas) ioConfig ); - List> taskList = createIndexTasks( replicas, group.baseSequenceName, @@ -2711,7 +2798,7 @@ protected abstract List parser = jsonMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("time", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null), + null, + null + ), + null + ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + DataSchema originalSchema = new DataSchema( + "test", + parser, + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("metric1", "col1"), + new DoubleSumAggregatorFactory("metric2", "col2"), + }, + new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))), + null, + jsonMapper + ); + + String serialized = jsonMapper.writeValueAsString(originalSchema); + TestModifiedDataSchema deserialized = jsonMapper.readValue(serialized, TestModifiedDataSchema.class); + + Assert.assertEquals(null, deserialized.getExtra()); + Assert.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource()); + Assert.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec()); + Assert.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec()); + Assert.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators()); + Assert.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec()); + Assert.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap()); + } + + @Test + public void testSerdeWithUpdatedDataSchemaRemovedField() throws IOException + { + Map parser = jsonMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("time", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null), + null, + null + ), + null + ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + TestModifiedDataSchema originalSchema = new TestModifiedDataSchema( + "test", + parser, + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("metric1", "col1"), + new DoubleSumAggregatorFactory("metric2", "col2"), + }, + new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))), + null, + jsonMapper, + "some arbitrary string" + ); + + String serialized = jsonMapper.writeValueAsString(originalSchema); + DataSchema deserialized = jsonMapper.readValue(serialized, DataSchema.class); + + Assert.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource()); + Assert.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec()); + Assert.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec()); + Assert.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators()); + Assert.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec()); + Assert.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap()); + } } diff --git a/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java b/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java new file mode 100644 index 000000000000..ca030fe875c5 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.transform.TransformSpec; + +import java.util.Map; + +public class TestModifiedDataSchema extends DataSchema +{ + private final String extra; + + @JsonCreator + public TestModifiedDataSchema( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("parser") Map parser, + @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("transformSpec") TransformSpec transformSpec, + @JacksonInject ObjectMapper jsonMapper, + @JsonProperty("extra") String extra + ) + { + super(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper); + this.extra = extra; + } + + @JsonProperty("extra") + public String getExtra() + { + return extra; + } +}