From fec982dd838cf701c54307db304c1478d5929558 Mon Sep 17 00:00:00 2001 From: Biao Li Date: Fri, 14 Apr 2017 15:00:09 -0400 Subject: [PATCH] make the timeout publishing as a failure --- .../main/java/io/druid/indexing/kafka/KafkaIndexTask.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index b344b8a3e9b5..6261154e7058 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -279,6 +279,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception ) ); + boolean publishedSuccessfully = false; try ( final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); @@ -516,6 +517,7 @@ public boolean publishSegments(Set segments, Object commitMetadata) if (published == null) { throw new ISE("Transaction failure publishing segments, aborting"); } else { + publishedSuccessfully = true; log.info( "Published segments[%s] with metadata[%s].", Joiner.on(", ").join( @@ -548,6 +550,11 @@ public String apply(DataSegment input) throw e; } + if (!publishedSuccessfully){ + log.error("The task did NOT publish the segments successfully, check the value of [completionTimeout]"); + throw e; + } + log.info("The task was asked to stop before completing"); } finally {