Skip to content

Commit

Permalink
Merge pull request #1769 from senthuran16/ha-aggregation-evt-duplicat…
Browse files Browse the repository at this point in the history
…ion-fix-master

Fix aggregation event duplication with HA and state persistence
  • Loading branch information
AnuGayan authored Mar 3, 2022
2 parents dd13423 + bdf5659 commit 1063494
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ public void initialiseExecutors(boolean isFirstEventArrived) {
}
}
}
LOG.info("Starting initialization of executors for aggregation: " + getAggregationDefinition().getId());
this.incrementalExecutorsInitialiser.initialiseExecutors();
LOG.info("Finished initialization of executors for aggregation: " + getAggregationDefinition().getId());
}

public void processEvents(ComplexEventChunk<StreamEvent> streamEventComplexEventChunk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public synchronized void process(Map<String, StreamEvent> groupedByEvents) {
}
}

public StateHolder<StoreState> getStoreStateHolder() {
return storeStateHolder;
}

private boolean shouldUpdate(Object data, ValueState state) {
long timestamp = (long) data;
if (timestamp >= state.lastTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt
} catch (Throwable t) {
LOG.error("Exception occurred at siddhi app '" + this.siddhiAppName +
"' when performing table writes of aggregation '" + this.aggregatorName +
"' for duration '" + this.duration + "'. This should be investigated as this " +
"can cause accuracy loss.", t);
"' for duration '" + this.duration + "'. Unless state persistence has been enabled, " +
"this should be investigated as this can cause accuracy loss.", t);
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping Event chunk - \"" + eventChunk.toString() + "\"");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.siddhi.core.table.Table;
import io.siddhi.core.util.IncrementalTimeConverterUtil;
import io.siddhi.core.util.parser.OnDemandQueryParser;
import io.siddhi.core.util.snapshot.state.SingleSyncStateHolder;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.core.window.Window;
import io.siddhi.query.api.aggregation.TimePeriod;
import io.siddhi.query.api.execution.query.OnDemandQuery;
Expand Down Expand Up @@ -146,10 +148,6 @@ public synchronized void initialiseExecutors() {
} else {
for (int i = incrementalDurations.size() - 1; i > 0; i--) {

TimePeriod.Duration recreateForDuration = incrementalDurations.get(i);
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);


// Get the table previous to the duration for which we need to recreate (e.g. if we want to recreate
// for minute duration, take the second table [provided that aggregation is done for seconds])
// This lookup is filtered by endOFLatestEventTimestamp
Expand All @@ -167,13 +165,17 @@ public synchronized void initialiseExecutors() {
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(referenceToNextLatestEvent, incrementalDurations.get(i - 1), timeZone);

ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
TimePeriod.Duration recreateForDuration = incrementalDurations.get(i);
if (!isStatePresentForAggregationDuration(recreateForDuration)) {
ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
}
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);
incrementalExecutor.execute(complexEventChunk);
}
incrementalExecutor.execute(complexEventChunk);

if (i == 1) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Expand All @@ -190,9 +192,17 @@ public synchronized void initialiseExecutors() {
this.isInitialised = true;
}

private boolean isStatePresentForAggregationDuration(TimePeriod.Duration recreateForDuration) {
IncrementalExecutor incrementalExecutor = (IncrementalExecutor) incrementalExecutorMap.get(recreateForDuration);
BaseIncrementalValueStore baseIncrementalValueStore = incrementalExecutor.getBaseIncrementalValueStore();
StateHolder<BaseIncrementalValueStore.StoreState> storeStateHolder =
baseIncrementalValueStore.getStoreStateHolder();
return (storeStateHolder instanceof SingleSyncStateHolder) &&
((SingleSyncStateHolder) storeStateHolder).isStatePresent();
}

private void recreateState(Long lastData, TimePeriod.Duration recreateForDuration,
Table recreateFromTable, boolean isBeforeRoot) {
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);
if (lastData != null) {
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(lastData, recreateForDuration, timeZone);
Expand All @@ -205,13 +215,16 @@ private void recreateState(Long lastData, TimePeriod.Duration recreateForDuratio
Event[] events = onDemandQueryRuntime.execute();
if (events != null) {
long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0);
ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
if (!isStatePresentForAggregationDuration(recreateForDuration)) {
ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
}
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);
incrementalExecutor.execute(complexEventChunk);
}
incrementalExecutor.execute(complexEventChunk);
if (isBeforeRoot) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public SingleSyncStateHolder(StateFactory stateFactory) {
this.allStates.put(null, groupByStates);
}

public boolean isStatePresent() {
return state != null;
}

@Override
public State getState() {
if (state == null) {
Expand Down

0 comments on commit 1063494

Please sign in to comment.