Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add checking for new checkpoint #14353

Merged
merged 13 commits into from
Sep 4, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexer.TaskLocation;
Expand Down Expand Up @@ -655,8 +656,12 @@ public void handle() throws ExecutionException, InterruptedException
return;
}
final Map<PartitionIdType, SequenceOffsetType> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
taskGroup.addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
if (MapUtils.isNotEmpty(newCheckpoint)) {
taskGroup.addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
} else {
log.warn("New checkpoint is null for taskGroup [%s]", taskGroupId);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@

package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
Expand All @@ -42,6 +48,7 @@
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
Expand All @@ -55,6 +62,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
Expand Down Expand Up @@ -93,6 +101,7 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -170,8 +179,7 @@ public void setupTest()
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expectLastCall().times(0, 1);

EasyMock
.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("10").anyTimes();
}
Expand Down Expand Up @@ -766,6 +774,169 @@ public void testStoppingGracefully()
verifyAll();
}

@Test(timeout = 60_000L)
public void testCheckpointForActiveTaskGroup()
throws InterruptedException, JsonProcessingException {
DateTime startTime = DateTimes.nowUtc();
SeekableStreamSupervisorIOConfig ioConfig = new SeekableStreamSupervisorIOConfig(
STREAM,
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
1,
new Period("PT1H"),
new Period("PT1S"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
null,
new IdleConfig(true, 200L),
null
) {};

EasyMock.reset(spec);
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [SeekableStreamSupervisorSpec.getDataSchema](1) should be avoided because it has been deprecated.
EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
@Override
public Duration getEmitterPeriod()
{
return new Period("PT1S").toStandardDuration();
}
}).anyTimes();
EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

SeekableStreamIndexTaskIOConfig taskIoConfig = createTaskIoConfigExt(
0,
Collections.singletonMap("0", "10"),
Collections.singletonMap("0", "20"),
"test",
startTime,
null,
Collections.emptySet(),
ioConfig
);

SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();

TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));

Map<String, Object> context = new HashMap<>();
context.put("checkpoints", new ObjectMapper().writeValueAsString(sequenceOffsets));

SeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
"id1",
null,
getDataSchema(),
taskTuningConfig,
taskIoConfig,
context,
"0"
);

SeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
"id2",
null,
getDataSchema(),
taskTuningConfig,
taskIoConfig,
context,
"0"
);

final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);

Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));

EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();

EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new TestSeekableStreamDataSourceMetadata(null)
).anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();

EasyMock.expect(indexTaskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
EasyMock.expect(indexTaskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)).anyTimes();

ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, partitionOffset);

EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id1"))
.andReturn(Futures.immediateFuture(partitionOffset))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture(partitionOffset))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id1"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id2"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.stopAsync("id1", false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.stopAsync("id2", false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();

replayAll();

SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

supervisor.start();
supervisor.runInternal();

supervisor.checkpoint(
0,
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), ImmutableSet.of())
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}

verifyAll();

Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
}

@Test
public void testEmitBothLag() throws Exception
Expand Down Expand Up @@ -1640,4 +1811,95 @@ public List<Event> getEvents()
}
}
}

private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final String taskType;
private final TaskLocation location;
private final String dataSource;

TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result, TaskLocation location)
{
super(task.getId(), result);
this.taskType = task.getType();
this.location = location;
this.dataSource = task.getDataSource();
}

@Override
public TaskLocation getLocation()
{
return location;
}

@Override
public String getTaskType()
{
return taskType;
}

@Override
public String getDataSource()
{
return dataSource;
}
}

private static class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata<String, String>
{

@JsonCreator
public TestSeekableStreamDataSourceMetadata(
@JsonProperty("partitions") SeekableStreamSequenceNumbers<String, String> partitions
)
{
super(partitions);
}

@Override
public DataSourceMetadata asStartMetadata()
{
final SeekableStreamSequenceNumbers<String, String> sequenceNumbers = getSeekableStreamSequenceNumbers();
if (sequenceNumbers instanceof SeekableStreamEndSequenceNumbers) {
return createConcreteDataSourceMetaData(
((SeekableStreamEndSequenceNumbers<String, String>) sequenceNumbers).asStartPartitions(true)
);
} else {
return this;
}
}

@Override
protected SeekableStreamDataSourceMetadata<String, String> createConcreteDataSourceMetaData(
SeekableStreamSequenceNumbers<String, String> seekableStreamSequenceNumbers
)
{
return new TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers);
}
}

private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt(
int groupId,
Map<String, String> startPartitions,
Map<String, String> endPartitions,
String baseSequenceName,
DateTime minimumMessageTime,
DateTime maximumMessageTime,
Set<String> exclusiveStartSequenceNumberPartitions,
SeekableStreamSupervisorIOConfig ioConfig
)
{
return new SeekableStreamIndexTaskIOConfig<String, String>(
groupId,
baseSequenceName,
new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
true,
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat()
)
{
};
}
}