Skip to content

Commit

Permalink
Merge pull request #1726 from AnuGayan/up_master_imp
Browse files Browse the repository at this point in the history
Improve incremental aggregation error handling
  • Loading branch information
dnwick authored May 31, 2021
2 parents 1d33f6d + e4409be commit 97e45e5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,17 +215,19 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt
LOG.debug("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString());
}
if (isProcessingExecutor) {
try {
executorService.execute(() -> {
executorService.execute(() -> {
try {
table.addEvents(tableEventChunk, streamEventMap.size());
} catch (Throwable t) {
LOG.error("Exception occurred at siddhi app '" + this.siddhiAppName +
"' when performing table writes of aggregation '" + this.aggregatorName +
"' for duration '" + this.duration + "'. This should be investigated as this " +
"can cause accuracy loss.", t);
} finally {
isProcessFinished.set(true);
});
} catch (Throwable t) {
LOG.error("Exception occurred at siddhi app '" + this.siddhiAppName +
"' when performing table writes of aggregation '" + this.aggregatorName +
"' for duration '" + this.duration + "'. This should be investigated as this " +
"can cause accuracy loss.", t);
}
}
});

}
if (waitUntillprocessFinish) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,6 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
.equals(AGG_START_TIMESTAMP_COL)) {
outerSelectColumnJoiner.add(" ? " + SQL_AS + variableExpressionExecutor.getAttribute().getName());
} else if (!variableExpressionExecutor.getAttribute().getName().equals(AGG_EXTERNAL_TIMESTAMP_COL)) {
subSelectT2ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName());
if (groupByColumnNames.contains(variableExpressionExecutor.getAttribute().getName())) {
subSelectT2ColumnJoiner.add(INNER_SELECT_QUERY_REF_T3 + "." +
variableExpressionExecutor.getAttribute().getName() + SQL_AS +
Expand Down

0 comments on commit 97e45e5

Please sign in to comment.