From e4409be184ada6d7bcec042f4ebce734c65696e6 Mon Sep 17 00:00:00 2001 From: AnuGayan Date: Mon, 24 May 2021 16:28:54 +0530 Subject: [PATCH] Improve incremental aggregation error handling --- .../core/aggregation/IncrementalExecutor.java | 20 ++++++++++--------- .../core/util/parser/AggregationParser.java | 1 - 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java index 60a2e5b338..eecd654853 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java @@ -215,17 +215,19 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt LOG.debug("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString()); } if (isProcessingExecutor) { - try { - executorService.execute(() -> { + executorService.execute(() -> { + try { table.addEvents(tableEventChunk, streamEventMap.size()); + } catch (Throwable t) { + LOG.error("Exception occurred at siddhi app '" + this.siddhiAppName + + "' when performing table writes of aggregation '" + this.aggregatorName + + "' for duration '" + this.duration + "'. This should be investigated as this " + + "can cause accuracy loss.", t); + } finally { isProcessFinished.set(true); - }); - } catch (Throwable t) { - LOG.error("Exception occurred at siddhi app '" + this.siddhiAppName + - "' when performing table writes of aggregation '" + this.aggregatorName + - "' for duration '" + this.duration + "'. This should be investigated as this " + - "can cause accuracy loss.", t); - } + } + }); + } if (waitUntillprocessFinish) { try { diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/AggregationParser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/AggregationParser.java index a0296a3b48..6ac1af6b88 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/AggregationParser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/AggregationParser.java @@ -1255,7 +1255,6 @@ private static String generateDatabaseQuery(List expressionE .equals(AGG_START_TIMESTAMP_COL)) { outerSelectColumnJoiner.add(" ? " + SQL_AS + variableExpressionExecutor.getAttribute().getName()); } else if (!variableExpressionExecutor.getAttribute().getName().equals(AGG_EXTERNAL_TIMESTAMP_COL)) { - subSelectT2ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName()); if (groupByColumnNames.contains(variableExpressionExecutor.getAttribute().getName())) { subSelectT2ColumnJoiner.add(INNER_SELECT_QUERY_REF_T3 + "." + variableExpressionExecutor.getAttribute().getName() + SQL_AS +