Skip to content

Commit

Permalink
Fix error handling after pause request in Kafka supervisor (#6754)
Browse files Browse the repository at this point in the history
* Fix error handling after pause request in kafka supervisor

* fix test

* fix test
  • Loading branch information
jihoonson authored and gianm committed Dec 19, 2018
1 parent 9505074 commit 4591c56
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexing.common.IndexTaskClient;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.kafka.KafkaIndexTask.Status;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -115,31 +116,48 @@ public Map<Integer, Long> pause(final String id)
true
);

if (response.getStatus().equals(HttpResponseStatus.OK)) {
final HttpResponseStatus responseStatus = response.getStatus();
final String responseContent = response.getContent();

if (responseStatus.equals(HttpResponseStatus.OK)) {
log.info("Task [%s] paused successfully", id);
return deserialize(response.getContent(), new TypeReference<Map<Integer, Long>>()
return deserialize(responseContent, new TypeReference<Map<Integer, Long>>()
{
});
}

while (true) {
if (getStatus(id) == KafkaIndexTask.Status.PAUSED) {
return getCurrentOffsets(id, true);
}
} else if (responseStatus.equals(HttpResponseStatus.ACCEPTED)) {
// The task received the pause request, but its status hasn't been changed yet.
while (true) {
final Status status = getStatus(id);
if (status == KafkaIndexTask.Status.PAUSED) {
return getCurrentOffsets(id, true);
}

final Duration delay = newRetryPolicy().getAndIncrementRetryDelay();
if (delay == null) {
log.error("Task [%s] failed to pause, aborting", id);
throw new ISE("Task [%s] failed to pause, aborting", id);
} else {
final long sleepTime = delay.getMillis();
log.info(
"Still waiting for task [%s] to pause; will try again in [%s]",
id,
new Duration(sleepTime).toString()
);
Thread.sleep(sleepTime);
final Duration delay = newRetryPolicy().getAndIncrementRetryDelay();
if (delay == null) {
throw new ISE(
"Task [%s] failed to change its status from [%s] to [%s], aborting",
id,
status,
Status.PAUSED
);
} else {
final long sleepTime = delay.getMillis();
log.info(
"Still waiting for task [%s] to change its status to [%s]; will try again in [%s]",
id,
Status.PAUSED,
new Duration(sleepTime).toString()
);
Thread.sleep(sleepTime);
}
}
} else {
throw new ISE(
"Pause request for task [%s] failed with response [%s] : [%s]",
id,
responseStatus,
responseContent
);
}
}
catch (NoTaskLocationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1628,13 +1628,39 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
// 3) Build a map of the highest offset read by any task in the group for each partition
final Map<Integer, Long> endOffsets = new HashMap<>();
for (int i = 0; i < input.size(); i++) {
Map<Integer, Long> result = input.get(i);
final Map<Integer, Long> result = input.get(i);
final String taskId = pauseTaskIds.get(i);

if (result == null || result.isEmpty()) { // kill tasks that didn't return a value
String taskId = pauseTaskIds.get(i);
killTask(taskId, "Task [%s] failed to respond to [pause] in a timely manner, killing task", taskId);
if (result == null) {
// Get the exception
final Throwable pauseException;
try {
// The below get should throw ExecutionException since result is null.
final Map<Integer, Long> pauseResult = pauseFutures.get(i).get();
throw new ISE(
"WTH? The pause request for task [%s] is supposed to fail, but returned [%s]",
taskId,
pauseResult
);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
pauseException = e.getCause() == null ? e : e.getCause();
}

killTask(
taskId,
"An exception occured while waiting for task [%s] to pause: [%s]",
taskId,
pauseException
);
taskGroup.tasks.remove(taskId);

} else if (result.isEmpty()) {
killTask(taskId, "Task [%s] returned empty offsets after pause", taskId);
taskGroup.tasks.remove(taskId);
} else { // otherwise build a map of the highest offsets seen
for (Entry<Integer, Long> offset : result.entrySet()) {
if (!endOffsets.containsKey(offset.getKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,10 @@ public void testPauseWithSubsequentGetOffsets() throws Exception
Capture<Request> captured = Capture.newInstance();
Capture<Request> captured2 = Capture.newInstance();
Capture<Request> captured3 = Capture.newInstance();
// one time in IndexTaskClient.submitRequest() and another in KafkaIndexTaskClient.pause()
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED).times(2)
.andReturn(HttpResponseStatus.OK).times(2);
expect(responseHolder.getContent()).andReturn("\"PAUSED\"")
.andReturn(HttpResponseStatus.OK).anyTimes();
expect(responseHolder.getContent()).andReturn("\"PAUSED\"").times(2)
.andReturn("{\"0\":1, \"1\":10}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1537,8 +1537,9 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception
.andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2);
taskQueue.shutdown(
EasyMock.contains("sequenceName-0"),
EasyMock.eq("Task [%s] failed to respond to [pause] in a timely manner, killing task"),
EasyMock.contains("sequenceName-0")
EasyMock.eq("An exception occured while waiting for task [%s] to pause: [%s]"),
EasyMock.contains("sequenceName-0"),
EasyMock.anyString()
);
expectLastCall().times(2);
expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
Expand Down

0 comments on commit 4591c56

Please sign in to comment.