Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix aggregation event duplication with HA and state persistence #1769

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ public void initialiseExecutors(boolean isFirstEventArrived) {
}
}
}
LOG.info("Starting initialization of executors for aggregation: " + getAggregationDefinition().getId());
this.incrementalExecutorsInitialiser.initialiseExecutors();
LOG.info("Finished initialization of executors for aggregation: " + getAggregationDefinition().getId());
}

public void processEvents(ComplexEventChunk<StreamEvent> streamEventComplexEventChunk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public synchronized void process(Map<String, StreamEvent> groupedByEvents) {
}
}

public StateHolder<StoreState> getStoreStateHolder() {
return storeStateHolder;
}

private boolean shouldUpdate(Object data, ValueState state) {
long timestamp = (long) data;
if (timestamp >= state.lastTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt
} catch (Throwable t) {
LOG.error("Exception occurred at siddhi app '" + this.siddhiAppName +
"' when performing table writes of aggregation '" + this.aggregatorName +
"' for duration '" + this.duration + "'. This should be investigated as this " +
"can cause accuracy loss.", t);
"' for duration '" + this.duration + "'. Unless state persistence has been enabled, " +
"this should be investigated as this can cause accuracy loss.", t);
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping Event chunk - \"" + eventChunk.toString() + "\"");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.siddhi.core.table.Table;
import io.siddhi.core.util.IncrementalTimeConverterUtil;
import io.siddhi.core.util.parser.OnDemandQueryParser;
import io.siddhi.core.util.snapshot.state.SingleSyncStateHolder;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.core.window.Window;
import io.siddhi.query.api.aggregation.TimePeriod;
import io.siddhi.query.api.execution.query.OnDemandQuery;
Expand Down Expand Up @@ -146,10 +148,6 @@ public synchronized void initialiseExecutors() {
} else {
for (int i = incrementalDurations.size() - 1; i > 0; i--) {

TimePeriod.Duration recreateForDuration = incrementalDurations.get(i);
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);


// Get the table previous to the duration for which we need to recreate (e.g. if we want to recreate
// for minute duration, take the second table [provided that aggregation is done for seconds])
// This lookup is filtered by endOFLatestEventTimestamp
Expand All @@ -167,13 +165,17 @@ public synchronized void initialiseExecutors() {
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(referenceToNextLatestEvent, incrementalDurations.get(i - 1), timeZone);

ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
TimePeriod.Duration recreateForDuration = incrementalDurations.get(i);
if (!isStatePresentForAggregationDuration(recreateForDuration)) {
ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
}
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);
incrementalExecutor.execute(complexEventChunk);
}
incrementalExecutor.execute(complexEventChunk);

if (i == 1) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Expand All @@ -190,9 +192,17 @@ public synchronized void initialiseExecutors() {
this.isInitialised = true;
}

private boolean isStatePresentForAggregationDuration(TimePeriod.Duration recreateForDuration) {
IncrementalExecutor incrementalExecutor = (IncrementalExecutor) incrementalExecutorMap.get(recreateForDuration);
BaseIncrementalValueStore baseIncrementalValueStore = incrementalExecutor.getBaseIncrementalValueStore();
StateHolder<BaseIncrementalValueStore.StoreState> storeStateHolder =
baseIncrementalValueStore.getStoreStateHolder();
return (storeStateHolder instanceof SingleSyncStateHolder) &&
((SingleSyncStateHolder) storeStateHolder).isStatePresent();
}

private void recreateState(Long lastData, TimePeriod.Duration recreateForDuration,
Table recreateFromTable, boolean isBeforeRoot) {
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);
if (lastData != null) {
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(lastData, recreateForDuration, timeZone);
Expand All @@ -205,13 +215,16 @@ private void recreateState(Long lastData, TimePeriod.Duration recreateForDuratio
Event[] events = onDemandQueryRuntime.execute();
if (events != null) {
long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0);
ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
if (!isStatePresentForAggregationDuration(recreateForDuration)) {
ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
}
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);
incrementalExecutor.execute(complexEventChunk);
}
incrementalExecutor.execute(complexEventChunk);
if (isBeforeRoot) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public SingleSyncStateHolder(StateFactory stateFactory) {
this.allStates.put(null, groupByStates);
}

public boolean isStatePresent() {
return state != null;
}

@Override
public State getState() {
if (state == null) {
Expand Down