Skip to content

Commit

Permalink
Fix state check bug in Kafka Index Task (#5204) (#5248)
Browse files Browse the repository at this point in the history
[Backport] Fix state check bug in Kafka Index Task #5204
  • Loading branch information
pjain1 authored and jon-wei committed Jan 10, 2018
1 parent d14b261 commit 61bef7d
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,14 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
final Object restoredMetadata = driver.startJob();
if (restoredMetadata == null) {
// no persist has happened so far
// so either this is a brand new task or replacement of a failed task
Preconditions.checkState(sequences.get(0).startOffsets.entrySet().stream().allMatch(
partitionOffsetEntry -> Longs.compare(
partitionOffsetEntry.getValue(),
ioConfig.getStartPartitions()
.getPartitionOffsetMap()
.get(partitionOffsetEntry.getKey())
) == 0
) >= 0
), "Sequence offsets are not compatible with start offsets of task");
nextOffsets.putAll(sequences.get(0).startOffsets);
} else {
Expand All @@ -545,7 +546,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()
);
}
// sequences size can 0 only when all sequences got published and task stopped before it could finish
// sequences size can be 0 only when all sequences got published and task stopped before it could finish
// which is super rare
if (sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) {
this.endOffsets.putAll(sequences.size() == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc
// as when the task starts they are sent existing checkpoints
Preconditions.checkState(
checkpoints.size() <= 1,
"Got checkpoint request with null as previous check point, however found more than one checkpoints in metadata store"
"Got checkpoint request with null as previous check point, however found more than one checkpoints"
);
if (checkpoints.size() == 1) {
log.info("Already checkpointed with dataSourceMetadata [%s]", checkpoints.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,12 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -1557,6 +1559,72 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva
}
}

@Test(timeout = 60_000L)
public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
{
// This tests the case when a replacement task is created in place of a failed test
// which has done some incremental handoffs, thus the context will contain starting
// sequence offsets from which the task should start reading and ignore the start offsets
if (!isIncrementalHandoffSupported) {
return;
}
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get();
}
}

final TreeMap<Integer, Map<Integer, Long>> sequences = new TreeMap<>();
// Here the sequence number is 1 meaning that one incremental handoff was done by the failed task
// and this task should start reading from offset 2 for partition 0
sequences.put(1, ImmutableMap.of(0, 2L));
final Map<String, Object> context = new HashMap<>();
context.put("checkpoints", objectMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
}).writeValueAsString(sequences));

final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
"sequence0",
// task should ignore these and use sequence info sent in the context
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
),
context
);

final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());

// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());

// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);

// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}

private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {
Expand Down Expand Up @@ -1615,6 +1683,15 @@ private KafkaIndexTask createTask(
return createTask(taskId, DATA_SCHEMA, ioConfig);
}

private KafkaIndexTask createTask(
final String taskId,
final KafkaIOConfig ioConfig,
final Map<String, Object> context
)
{
return createTask(taskId, DATA_SCHEMA, ioConfig, context);
}

private KafkaIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
Expand Down Expand Up @@ -1651,6 +1728,45 @@ private KafkaIndexTask createTask(
return task;
}


private KafkaIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KafkaIOConfig ioConfig,
final Map<String, Object> context
)
{
final KafkaTuningConfig tuningConfig = new KafkaTuningConfig(
1000,
maxRowsPerSegment,
new Period("P1Y"),
null,
null,
null,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
null
);
if (isIncrementalHandoffSupported) {
context.put(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
}

final KafkaIndexTask task = new KafkaIndexTask(
taskId,
null,
cloneDataSchema(dataSchema),
tuningConfig,
ioConfig,
context,
null,
null
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
}

private static DataSchema cloneDataSchema(final DataSchema dataSchema)
{
return new DataSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public interface Supervisor
void reset(DataSourceMetadata dataSourceMetadata);

/**
* The definition of checkpoint is not very strict as currently it does not affect data or control path
* The definition of checkpoint is not very strict as currently it does not affect data or control path.
* On this call Supervisor can potentially checkpoint data processed so far to some durable storage
* for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data
* represented by dataSourceMetadata
* represented by {@param currentCheckpoint} DataSourceMetadata
*
* @param sequenceName unique Identifier to figure out for which sequence to do check pointing
* @param previousCheckPoint DataSourceMetadata check pointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be check pointed
* @param sequenceName unique Identifier to figure out for which sequence to do checkpointing
* @param previousCheckPoint DataSourceMetadata checkpointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be checkpointed
*/
void checkpoint(
@Nullable String sequenceName,
Expand Down

0 comments on commit 61bef7d

Please sign in to comment.