Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix state check bug in Kafka Index Task #5204

Merged
merged 3 commits into from
Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -511,13 +511,14 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
final Object restoredMetadata = driver.startJob();
if (restoredMetadata == null) {
// no persist has happened so far
// so either this is a brand new task or replacement of a failed task
Preconditions.checkState(sequences.get(0).startOffsets.entrySet().stream().allMatch(
partitionOffsetEntry -> Longs.compare(
partitionOffsetEntry.getValue(),
ioConfig.getStartPartitions()
.getPartitionOffsetMap()
.get(partitionOffsetEntry.getKey())
) == 0
) >= 0
), "Sequence offsets are not compatible with start offsets of task");
nextOffsets.putAll(sequences.get(0).startOffsets);
} else {
Expand All @@ -544,7 +545,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()
);
}
// sequences size can 0 only when all sequences got published and task stopped before it could finish
// sequences size can be 0 only when all sequences got published and task stopped before it could finish
// which is super rare
if (sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) {
this.endOffsets.putAll(sequences.size() == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc
// as when the task starts they are sent existing checkpoints
Preconditions.checkState(
checkpoints.size() <= 1,
"Got checkpoint request with null as previous check point, however found more than one checkpoints in metadata store"
"Got checkpoint request with null as previous check point, however found more than one checkpoints"
);
if (checkpoints.size() == 1) {
log.info("Already checkpointed with dataSourceMetadata [%s]", checkpoints.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,12 @@
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -1555,6 +1557,72 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva
}
}

@Test(timeout = 60_000L)
public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
{
// This tests the case when a replacement task is created in place of a failed test
// which has done some incremental handoffs, thus the context will contain starting
// sequence offsets from which the task should start reading and ignore the start offsets
if (!isIncrementalHandoffSupported) {
return;
}
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get();
}
}

final TreeMap<Integer, Map<Integer, Long>> sequences = new TreeMap<>();
// Here the sequence number is 1 meaning that one incremental handoff was done by the failed task
// and this task should start reading from offset 2 for partition 0
sequences.put(1, ImmutableMap.of(0, 2L));
final Map<String, Object> context = new HashMap<>();
context.put("checkpoints", objectMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
}).writeValueAsString(sequences));

final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
"sequence0",
// task should ignore these and use sequence info sent in the context
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
),
context
);

final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());

// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());

// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);

// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}

private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {
Expand Down Expand Up @@ -1613,6 +1681,15 @@ private KafkaIndexTask createTask(
return createTask(taskId, DATA_SCHEMA, ioConfig);
}

private KafkaIndexTask createTask(
final String taskId,
final KafkaIOConfig ioConfig,
final Map<String, Object> context
)
{
return createTask(taskId, DATA_SCHEMA, ioConfig, context);
}

private KafkaIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
Expand Down Expand Up @@ -1649,6 +1726,45 @@ private KafkaIndexTask createTask(
return task;
}


private KafkaIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KafkaIOConfig ioConfig,
final Map<String, Object> context
)
{
final KafkaTuningConfig tuningConfig = new KafkaTuningConfig(
1000,
maxRowsPerSegment,
new Period("P1Y"),
null,
null,
null,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
null
);
if (isIncrementalHandoffSupported) {
context.put(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
}

final KafkaIndexTask task = new KafkaIndexTask(
taskId,
null,
cloneDataSchema(dataSchema),
tuningConfig,
ioConfig,
context,
null,
null
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
}

private static DataSchema cloneDataSchema(final DataSchema dataSchema)
{
return new DataSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public interface Supervisor
void reset(DataSourceMetadata dataSourceMetadata);

/**
* The definition of checkpoint is not very strict as currently it does not affect data or control path
* The definition of checkpoint is not very strict as currently it does not affect data or control path.
* On this call Supervisor can potentially checkpoint data processed so far to some durable storage
* for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data
* represented by dataSourceMetadata
* represented by {@param currentCheckpoint} DataSourceMetadata
*
* @param sequenceName unique Identifier to figure out for which sequence to do check pointing
* @param previousCheckPoint DataSourceMetadata check pointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be check pointed
* @param sequenceName unique Identifier to figure out for which sequence to do checkpointing
* @param previousCheckPoint DataSourceMetadata checkpointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be checkpointed
*/
void checkpoint(
@Nullable String sequenceName,
Expand Down