Skip to content

Commit

Permalink
Merge pull request #1792 from GihanAyesh/master
Browse files Browse the repository at this point in the history
Move the log entries of initialiseExecutors
  • Loading branch information
AnuGayan authored Sep 9, 2022
2 parents 544457e + efaa189 commit 1de1a3d
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,7 @@ public void initialiseExecutors(boolean isFirstEventArrived) {
}
}
}
LOG.info("Starting initialization of executors for aggregation: " + getAggregationDefinition().getId());
this.incrementalExecutorsInitialiser.initialiseExecutors();
LOG.info("Finished initialization of executors for aggregation: " + getAggregationDefinition().getId());
}

public void processEvents(ComplexEventChunk<StreamEvent> streamEventComplexEventChunk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.core.window.Window;
import io.siddhi.query.api.aggregation.TimePeriod;
import io.siddhi.query.api.definition.AggregationDefinition;
import io.siddhi.query.api.execution.query.OnDemandQuery;
import io.siddhi.query.api.execution.query.input.store.InputStore;
import io.siddhi.query.api.execution.query.selection.OrderByAttribute;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class IncrementalExecutorsInitialiser {
private boolean isInitialised;
private boolean isReadOnly;
private boolean isPersistedAggregation;
private AggregationDefinition aggregationDefinition;

public IncrementalExecutorsInitialiser(List<TimePeriod.Duration> incrementalDurations,
Map<TimePeriod.Duration, Table> aggregationTables,
Expand All @@ -82,7 +84,8 @@ public IncrementalExecutorsInitialiser(List<TimePeriod.Duration> incrementalDura
MetaStreamEvent metaStreamEvent, Map<String, Table> tableMap,
Map<String, Window> windowMap,
Map<String, AggregationRuntime> aggregationMap, String timeZone,
boolean isReadOnly, boolean isPersistedAggregation) {
boolean isReadOnly, boolean isPersistedAggregation,
AggregationDefinition aggregationDefinition) {
this.timeZone = timeZone;
this.incrementalDurations = incrementalDurations;
this.aggregationTables = aggregationTables;
Expand All @@ -101,13 +104,15 @@ public IncrementalExecutorsInitialiser(List<TimePeriod.Duration> incrementalDura
this.isInitialised = false;
this.isReadOnly = isReadOnly;
this.isPersistedAggregation = isPersistedAggregation;
this.aggregationDefinition = aggregationDefinition;
}

public synchronized void initialiseExecutors() {
if (this.isInitialised || isReadOnly) {
// Only cleared when executors change from reading to processing state in one node deployment
return;
}
log.info("Starting initialization of executors for aggregation: " + aggregationDefinition.getId());
Event[] events;
Long lastData = null;

Expand Down Expand Up @@ -195,6 +200,7 @@ public synchronized void initialiseExecutors() {
}
}
this.isInitialised = true;
log.info("Finished initialization of executors for aggregation: " + aggregationDefinition.getId());
}

private boolean isStatePresentForAggregationDuration(TimePeriod.Duration recreateForDuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ public static AggregationRuntime parse(AggregationDefinition aggregationDefiniti
//Recreate in-memory data from tables
IncrementalExecutorsInitialiser incrementalExecutorsInitialiser = new IncrementalExecutorsInitialiser(
aggregationDurations, aggregationTables, incrementalExecutorMap, isDistributed, shardId,
siddhiAppContext, processedMetaStreamEvent, tableMap, windowMap, aggregationMap, timeZone, isReadOnly, isPersistedAggregation);
siddhiAppContext, processedMetaStreamEvent, tableMap, windowMap, aggregationMap, timeZone, isReadOnly, isPersistedAggregation, aggregationDefinition);

IncrementalExecutor rootIncrementalExecutor = (IncrementalExecutor) incrementalExecutorMap.
get(aggregationDurations.get(0));
Expand Down

0 comments on commit 1de1a3d

Please sign in to comment.