From 84acda5ca039c70136237ca9e4b7e8978f6887c1 Mon Sep 17 00:00:00 2001 From: senthuran16 Date: Wed, 2 Mar 2022 23:50:47 +0530 Subject: [PATCH 1/3] Fix aggregation event duplication with HA and state persistence --- .../core/aggregation/AggregationRuntime.java | 1 + .../BaseIncrementalValueStore.java | 4 ++ .../core/aggregation/IncrementalExecutor.java | 4 +- .../IncrementalExecutorsInitialiser.java | 47 ++++++++++++------- .../snapshot/state/SingleSyncStateHolder.java | 4 ++ 5 files changed, 41 insertions(+), 19 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java index 6e70558e63..db75e0f6a1 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java @@ -716,6 +716,7 @@ public void initialiseExecutors(boolean isFirstEventArrived) { } } this.incrementalExecutorsInitialiser.initialiseExecutors(); + LOG.info("Finished initialisation of executors for aggregation: " + getAggregationDefinition().getId()); } public void processEvents(ComplexEventChunk streamEventComplexEventChunk) { diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/BaseIncrementalValueStore.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/BaseIncrementalValueStore.java index 36114f8c3c..48f646e379 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/BaseIncrementalValueStore.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/BaseIncrementalValueStore.java @@ -188,6 +188,10 @@ public synchronized void process(Map groupedByEvents) { } } + public StateHolder getStoreStateHolder() { + return storeStateHolder; + } + private boolean shouldUpdate(Object data, ValueState state) { long timestamp = (long) data; if (timestamp >= state.lastTimestamp) { diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java index 452c25b89d..6bc8477f4a 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java @@ -222,8 +222,8 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt } catch (Throwable t) { LOG.error("Exception occurred at siddhi app '" + this.siddhiAppName + "' when performing table writes of aggregation '" + this.aggregatorName + - "' for duration '" + this.duration + "'. This should be investigated as this " + - "can cause accuracy loss.", t); + "' for duration '" + this.duration + "'. Unless state persistence has been enabled, " + + "this should be investigated as this can cause accuracy loss.", t); if (LOG.isDebugEnabled()) { LOG.debug("Dropping Event chunk - \"" + eventChunk.toString() + "\""); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java index 2c567cf377..d56344886b 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java @@ -28,6 +28,8 @@ import io.siddhi.core.table.Table; import io.siddhi.core.util.IncrementalTimeConverterUtil; import io.siddhi.core.util.parser.OnDemandQueryParser; +import io.siddhi.core.util.snapshot.state.SingleSyncStateHolder; +import io.siddhi.core.util.snapshot.state.StateHolder; import io.siddhi.core.window.Window; import io.siddhi.query.api.aggregation.TimePeriod; import io.siddhi.query.api.execution.query.OnDemandQuery; @@ -146,10 +148,6 @@ public synchronized void initialiseExecutors() { } else { for (int i = incrementalDurations.size() - 1; i > 0; i--) { - TimePeriod.Duration recreateForDuration = incrementalDurations.get(i); - Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration); - - // Get the table previous to the duration for which we need to recreate (e.g. if we want to recreate // for minute duration, take the second table [provided that aggregation is done for seconds]) // This lookup is filtered by endOFLatestEventTimestamp @@ -167,13 +165,17 @@ public synchronized void initialiseExecutors() { endOFLatestEventTimestamp = IncrementalTimeConverterUtil .getNextEmitTime(referenceToNextLatestEvent, incrementalDurations.get(i - 1), timeZone); - ComplexEventChunk complexEventChunk = new ComplexEventChunk<>(); - for (Event event : events) { - StreamEvent streamEvent = streamEventFactory.newInstance(); - streamEvent.setOutputData(event.getData()); - complexEventChunk.add(streamEvent); + TimePeriod.Duration recreateForDuration = incrementalDurations.get(i); + if (!isStatePresentForAggregationDuration(recreateForDuration)) { + ComplexEventChunk complexEventChunk = new ComplexEventChunk<>(); + for (Event event : events) { + StreamEvent streamEvent = streamEventFactory.newInstance(); + streamEvent.setOutputData(event.getData()); + complexEventChunk.add(streamEvent); + } + Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration); + incrementalExecutor.execute(complexEventChunk); } - incrementalExecutor.execute(complexEventChunk); if (i == 1) { TimePeriod.Duration rootDuration = incrementalDurations.get(0); @@ -190,9 +192,17 @@ public synchronized void initialiseExecutors() { this.isInitialised = true; } + private boolean isStatePresentForAggregationDuration(TimePeriod.Duration recreateForDuration) { + IncrementalExecutor incrementalExecutor = (IncrementalExecutor) incrementalExecutorMap.get(recreateForDuration); + BaseIncrementalValueStore baseIncrementalValueStore = incrementalExecutor.getBaseIncrementalValueStore(); + StateHolder storeStateHolder = + baseIncrementalValueStore.getStoreStateHolder(); + return (storeStateHolder instanceof SingleSyncStateHolder) && + ((SingleSyncStateHolder) storeStateHolder).isStatePresent(); + } + private void recreateState(Long lastData, TimePeriod.Duration recreateForDuration, Table recreateFromTable, boolean isBeforeRoot) { - Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration); if (lastData != null) { endOFLatestEventTimestamp = IncrementalTimeConverterUtil .getNextEmitTime(lastData, recreateForDuration, timeZone); @@ -205,13 +215,16 @@ private void recreateState(Long lastData, TimePeriod.Duration recreateForDuratio Event[] events = onDemandQueryRuntime.execute(); if (events != null) { long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0); - ComplexEventChunk complexEventChunk = new ComplexEventChunk<>(); - for (Event event : events) { - StreamEvent streamEvent = streamEventFactory.newInstance(); - streamEvent.setOutputData(event.getData()); - complexEventChunk.add(streamEvent); + if (!isStatePresentForAggregationDuration(recreateForDuration)) { + ComplexEventChunk complexEventChunk = new ComplexEventChunk<>(); + for (Event event : events) { + StreamEvent streamEvent = streamEventFactory.newInstance(); + streamEvent.setOutputData(event.getData()); + complexEventChunk.add(streamEvent); + } + Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration); + incrementalExecutor.execute(complexEventChunk); } - incrementalExecutor.execute(complexEventChunk); if (isBeforeRoot) { TimePeriod.Duration rootDuration = incrementalDurations.get(0); Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration); diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/snapshot/state/SingleSyncStateHolder.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/snapshot/state/SingleSyncStateHolder.java index ef26077ccb..ca549cc06a 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/snapshot/state/SingleSyncStateHolder.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/snapshot/state/SingleSyncStateHolder.java @@ -39,6 +39,10 @@ public SingleSyncStateHolder(StateFactory stateFactory) { this.allStates.put(null, groupByStates); } + public boolean isStatePresent() { + return state != null; + } + @Override public State getState() { if (state == null) { From 1f6e0d125b2ef0494b0aebc251a684396975124b Mon Sep 17 00:00:00 2001 From: senthuran16 Date: Thu, 3 Mar 2022 00:23:58 +0530 Subject: [PATCH 2/3] Fix aggregation event duplication with HA and state persistence --- .../main/java/io/siddhi/core/aggregation/AggregationRuntime.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java index db75e0f6a1..757aa2358f 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java @@ -715,6 +715,7 @@ public void initialiseExecutors(boolean isFirstEventArrived) { } } } + LOG.info("Starting initialisation of executors for aggregation: " + getAggregationDefinition().getId()); this.incrementalExecutorsInitialiser.initialiseExecutors(); LOG.info("Finished initialisation of executors for aggregation: " + getAggregationDefinition().getId()); } From bdf56593a13eeeb1a0c2d4d7c8f8e90613e4e81a Mon Sep 17 00:00:00 2001 From: senthuran16 Date: Thu, 3 Mar 2022 14:07:31 +0530 Subject: [PATCH 3/3] Replace 'initialisation' with 'initialization' in logs --- .../java/io/siddhi/core/aggregation/AggregationRuntime.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java index 757aa2358f..ccadf7dfec 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java @@ -715,9 +715,9 @@ public void initialiseExecutors(boolean isFirstEventArrived) { } } } - LOG.info("Starting initialisation of executors for aggregation: " + getAggregationDefinition().getId()); + LOG.info("Starting initialization of executors for aggregation: " + getAggregationDefinition().getId()); this.incrementalExecutorsInitialiser.initialiseExecutors(); - LOG.info("Finished initialisation of executors for aggregation: " + getAggregationDefinition().getId()); + LOG.info("Finished initialization of executors for aggregation: " + getAggregationDefinition().getId()); } public void processEvents(ComplexEventChunk streamEventComplexEventChunk) {