Skip to content

Commit

Permalink
Add checking for new checkpoint (apache#14353)
Browse files Browse the repository at this point in the history
Check that a checkpoint is non-empty before adding it to the checkpoint sequence 
in a SeekableStreamSupervisor
  • Loading branch information
panhongan authored and jakubmatyszewski committed Sep 8, 2023
1 parent f6969e5 commit d5ab062
Show file tree
Hide file tree
Showing 2 changed files with 270 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.error.DruidException;
Expand Down Expand Up @@ -691,8 +692,12 @@ public void handle() throws ExecutionException, InterruptedException
return;
}
final Map<PartitionIdType, SequenceOffsetType> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
taskGroup.addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
if (MapUtils.isNotEmpty(newCheckpoint)) {
taskGroup.addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
} else {
log.warn("New checkpoint is null for taskGroup [%s]", taskGroupId);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionSchema;
Expand All @@ -34,6 +38,8 @@
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
Expand All @@ -44,6 +50,7 @@
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
Expand All @@ -57,6 +64,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
Expand Down Expand Up @@ -98,6 +106,7 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -771,6 +780,169 @@ public void testStoppingGracefully()
verifyAll();
}

@Test(timeout = 60_000L)
public void testCheckpointForActiveTaskGroup() throws InterruptedException, JsonProcessingException
{
DateTime startTime = DateTimes.nowUtc();
SeekableStreamSupervisorIOConfig ioConfig = new SeekableStreamSupervisorIOConfig(
STREAM,
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
1,
new Period("PT1H"),
new Period("PT1S"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
null,
new IdleConfig(true, 200L),
null
) {};

EasyMock.reset(spec);
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
@Override
public Duration getEmissionDuration()
{
return new Period("PT2S").toStandardDuration();
}
}).anyTimes();
EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

SeekableStreamIndexTaskIOConfig taskIoConfig = createTaskIoConfigExt(
0,
Collections.singletonMap("0", "10"),
Collections.singletonMap("0", "20"),
"test",
startTime,
null,
Collections.emptySet(),
ioConfig
);

SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();

TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));

Map<String, Object> context = new HashMap<>();
context.put("checkpoints", new ObjectMapper().writeValueAsString(sequenceOffsets));

SeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
"id1",
null,
getDataSchema(),
taskTuningConfig,
taskIoConfig,
context,
"0"
);

SeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
"id2",
null,
getDataSchema(),
taskTuningConfig,
taskIoConfig,
context,
"0"
);

final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);

Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));

EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();

EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new TestSeekableStreamDataSourceMetadata(null)
).anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();

EasyMock.expect(indexTaskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
EasyMock.expect(indexTaskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)).anyTimes();

ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, partitionOffset);

EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id1"))
.andReturn(Futures.immediateFuture(partitionOffset))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture(partitionOffset))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id1"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id2"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.stopAsync("id1", false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.stopAsync("id2", false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();

replayAll();

SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

supervisor.start();
supervisor.runInternal();

supervisor.checkpoint(
0,
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), ImmutableSet.of())
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}

verifyAll();

Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
}

@Test
public void testEmitBothLag() throws Exception
Expand Down Expand Up @@ -2208,4 +2380,95 @@ public List<Event> getEvents()
}
}
}

private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final String taskType;
private final TaskLocation location;
private final String dataSource;

TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result, TaskLocation location)
{
super(task.getId(), result);
this.taskType = task.getType();
this.location = location;
this.dataSource = task.getDataSource();
}

@Override
public TaskLocation getLocation()
{
return location;
}

@Override
public String getTaskType()
{
return taskType;
}

@Override
public String getDataSource()
{
return dataSource;
}
}

private static class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata<String, String>
{

@JsonCreator
public TestSeekableStreamDataSourceMetadata(
@JsonProperty("partitions") SeekableStreamSequenceNumbers<String, String> partitions
)
{
super(partitions);
}

@Override
public DataSourceMetadata asStartMetadata()
{
final SeekableStreamSequenceNumbers<String, String> sequenceNumbers = getSeekableStreamSequenceNumbers();
if (sequenceNumbers instanceof SeekableStreamEndSequenceNumbers) {
return createConcreteDataSourceMetaData(
((SeekableStreamEndSequenceNumbers<String, String>) sequenceNumbers).asStartPartitions(true)
);
} else {
return this;
}
}

@Override
protected SeekableStreamDataSourceMetadata<String, String> createConcreteDataSourceMetaData(
SeekableStreamSequenceNumbers<String, String> seekableStreamSequenceNumbers
)
{
return new TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers);
}
}

private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt(
int groupId,
Map<String, String> startPartitions,
Map<String, String> endPartitions,
String baseSequenceName,
DateTime minimumMessageTime,
DateTime maximumMessageTime,
Set<String> exclusiveStartSequenceNumberPartitions,
SeekableStreamSupervisorIOConfig ioConfig
)
{
return new SeekableStreamIndexTaskIOConfig<String, String>(
groupId,
baseSequenceName,
new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
true,
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat()
)
{
};
}
}

0 comments on commit d5ab062

Please sign in to comment.