Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Localized state management, and improved partition #1158

Merged
merged 23 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
29d4b1e
Add state classes
suhothayan Feb 25, 2019
e9a6664
Merge branch 'query-context' into state-partition
suhothayan Feb 26, 2019
fc771aa
Add query state management, remove cloning, improve data storage format
suhothayan Mar 21, 2019
12638b5
Remove redundant state management and synchronizations
suhothayan Mar 21, 2019
d744e74
Fix incremental data storage, improve StateHandler to handle group by
suhothayan Mar 22, 2019
a92c499
Fix concurrency and build issues.
suhothayan Mar 22, 2019
a7fa11a
Fix build issues
suhothayan Mar 23, 2019
d1bb4a6
Add groupBy cleaning function, improve group by perf better than 4.x.
suhothayan Mar 24, 2019
b4e1d5f
Add groupBy cleaning function, improve group by perf better than 4.x.
suhothayan Mar 24, 2019
7f9d964
Add perf tests
suhothayan Mar 25, 2019
fce0c1f
Remove unnecessary synchronizations
suhothayan Mar 26, 2019
f498eee
Make snapshot and restore synchronized.
suhothayan Mar 27, 2019
809c5d6
Optimize group by data cleaning
suhothayan Mar 27, 2019
693948e
Remove event pools
suhothayan Mar 27, 2019
78fa7a5
Merge remote-tracking branch 'siddhi-io/master' into state-partition
suhothayan Mar 27, 2019
f86f8ff
Add partition purging
suhothayan Mar 28, 2019
7e7acf4
Add Session window test to the build
suhothayan Mar 29, 2019
39d6677
Add Delay window test to the build
suhothayan Mar 29, 2019
930588b
Improve partition creation notification
suhothayan Mar 29, 2019
fa6860d
Rename partition retention.period to idle.period
suhothayan Mar 29, 2019
7065699
Fix behaviour of "output first every <time>"
suhothayan Mar 30, 2019
61ab808
Fix behaviour of "output first every"
suhothayan Mar 31, 2019
0f5ee5f
Fix test by increasing wait time.
suhothayan Apr 1, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
<suppressions>

<suppress checks="Javadoc.*" files=".*[/\\]src[/\\]test[/\\]java[/\\].*"/>
<suppress checks=".*" files=".*[/\\]antlr4[/\\]io[/\\]siddhi[/\\]query[/\\]compiler[/\\]*" />
<suppress checks=".*" files=".*[/\\]src[/\\]gen[/\\]java[/\\]io[/\\]siddhi[/\\]service[/\\]*" />
<suppress checks=".*" files=".*[/\\]antlr4[/\\]io[/\\]siddhi[/\\]query[/\\]compiler[/\\]*"/>
<suppress checks=".*" files=".*[/\\]src[/\\]gen[/\\]java[/\\]io[/\\]siddhi[/\\]service[/\\]*"/>

<suppress checks="JavadocPackage" files=".*[/\\]src[/\\](main|integration)[/\\]java[/\\].*"/>
<suppress checks="JavadocPackage" files=".*[/\\]src[/\\].*[/\\]internal[/\\].*"/>
Expand Down
2 changes: 1 addition & 1 deletion docs/api/latest.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# API Docs - v5.0.0-m4
# API Docs - v5.0.0-SNAPSHOT

## Core

Expand Down
2 changes: 1 addition & 1 deletion docs/documentation/siddhi-4.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -2706,7 +2706,7 @@ Siddhi supports following extension types:

For each event, it consumes zero or more parameters as input parameters and returns a single attribute with aggregated results. This can be used in conjunction with a window in order to find the aggregated results based on the given window like any Aggregate Function operation.

This is implemented by extending `io.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator`.
This is implemented by extending `io.siddhi.core.query.selector.attribute.aggregator.AttributeAggregatorExecutor`.

Example :

Expand Down
20 changes: 20 additions & 0 deletions findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,26 @@
<Class name="io.siddhi.core.table.holder.ListEventHolder"/>
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
</Match>
<Match>
<Class name="io.siddhi.core.query.output.ratelimit.snapshot.AggregationGroupByWindowedPerSnapshotOutputRateLimiter"/>
<Bug pattern="BC_UNCONFIRMED_CAST"/>
</Match>
<Match>
<Class name="io.siddhi.core.query.processor.stream.window.DelayWindowProcessor"/>
<Bug pattern="BC_UNCONFIRMED_CAST"/>
</Match>
<Match>
<Class name="io.siddhi.core.query.processor.stream.window.SessionWindowProcessor"/>
<Bug pattern="DMI_ENTRY_SETS_MAY_REUSE_ENTRY_OBJECTS"/>
</Match>
<Match>
<Class name="io.siddhi.core.util.Scheduler$SchedulerState"/>
<Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
</Match>
<Match>
<Class name="io.siddhi.core.util.snapshot.state.SingleSyncStateHolder"/>
<Bug pattern="DC_DOUBLECHECK"/>
</Match>


<Match>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
AnnotationConstants.SINK_MAPPER_SUPER_CLASS,
AnnotationConstants.SINK_SUPER_CLASS,
AnnotationConstants.FUNCTION_EXECUTOR_SUPER_CLASS,
AnnotationConstants.AGGREGATION_ATTRIBUTE_SUPER_CLASS,
AnnotationConstants.AGGREGATION_ATTRIBUTE_EXECUTOR_SUPER_CLASS,
AnnotationConstants.DISTRIBUTION_STRATEGY_SUPER_CLASS,
AnnotationConstants.STREAM_PROCESSOR_SUPER_CLASS,
AnnotationConstants.STREAM_FUNCTION_PROCESSOR_SUPER_CLASS,
Expand Down Expand Up @@ -117,7 +117,7 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
abstractAnnotationProcessor =
new FunctionExecutorValidationAnnotationProcessor(extensionClassFullName);
break;
case AnnotationConstants.AGGREGATION_ATTRIBUTE_SUPER_CLASS:
case AnnotationConstants.AGGREGATION_ATTRIBUTE_EXECUTOR_SUPER_CLASS:
abstractAnnotationProcessor =
new AggregationAttributeValidationAnnotationProcessor(extensionClassFullName);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class AnnotationConstants {
"io.siddhi.core.function.Script";
public static final String FUNCTION_EXECUTOR_SUPER_CLASS =
"io.siddhi.core.executor.function.FunctionExecutor";
public static final String AGGREGATION_ATTRIBUTE_SUPER_CLASS =
"io.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator";
public static final String AGGREGATION_ATTRIBUTE_EXECUTOR_SUPER_CLASS =
"io.siddhi.core.query.selector.attribute.aggregator.AttributeAggregatorExecutor";
public static final String DISTRIBUTION_STRATEGY_SUPER_CLASS =
"io.siddhi.core.stream.output.sink.distributed.DistributionStrategy";
public static final String STREAM_PROCESSOR_SUPER_CLASS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.SiddhiConstants;
import io.siddhi.core.util.StringUtil;
import io.siddhi.core.util.extension.holder.EternalReferencedHolder;
import io.siddhi.core.util.extension.holder.ExternalReferencedHolder;
import io.siddhi.core.util.parser.StoreQueryParser;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.persistence.util.PersistenceHelper;
Expand Down Expand Up @@ -243,7 +243,7 @@ public Set<String> getQueryNames() {
public Map<String, Map<String, AbstractDefinition>> getPartitionedInnerStreamDefinitionMap() {
Map<String, Map<String, AbstractDefinition>> innerStreams = new HashMap<>();
for (PartitionRuntime partition : partitionMap.values()) {
innerStreams.put(partition.getElementId(), partition.getLocalStreamDefinitionMap());
innerStreams.put(partition.getPartitionName(), partition.getLocalStreamDefinitionMap());
}
return innerStreams;
}
Expand Down Expand Up @@ -390,8 +390,9 @@ public synchronized void startWithoutSources() {
if (siddhiAppContext.isStatsEnabled() && siddhiAppContext.getStatisticsManager() != null) {
siddhiAppContext.getStatisticsManager().startReporting();
}
for (EternalReferencedHolder eternalReferencedHolder : siddhiAppContext.getEternalReferencedHolders()) {
eternalReferencedHolder.start();
for (ExternalReferencedHolder externalReferencedHolder :
siddhiAppContext.getExternalReferencedHolders()) {
externalReferencedHolder.start();
}
for (List<Sink> sinks : sinkMap.values()) {
for (Sink sink : sinks) {
Expand Down Expand Up @@ -427,7 +428,7 @@ public void setPurgingEnabled(boolean purgingEnabled) {
this.incrementalDataPurging = purgingEnabled;
}

public synchronized void startSources() {
public void startSources() {
if (running) {
log.warn("Error calling startSources() for Siddhi App '" + siddhiAppContext.getName() + "', " +
"SiddhiApp already started with the sources.");
Expand Down Expand Up @@ -465,7 +466,7 @@ public synchronized void shutdown() {
try {
if (sourceHandlerManager != null) {
sourceHandlerManager.unregisterSourceHandler(source.getMapper().getHandler().
getElementId());
getId());
}
source.shutdown();
} catch (Throwable t) {
Expand Down Expand Up @@ -494,7 +495,7 @@ public synchronized void shutdown() {
for (Sink sink : sinks) {
try {
if (sinkHandlerManager != null) {
sinkHandlerManager.unregisterSinkHandler(sink.getHandler().getElementId());
sinkHandlerManager.unregisterSinkHandler(sink.getHandler().getId());
}
sink.shutdown();
} catch (Throwable t) {
Expand All @@ -514,7 +515,7 @@ public synchronized void shutdown() {
String elementId = null;
RecordTableHandler recordTableHandler = table.getHandler();
if (recordTableHandler != null) {
elementId = recordTableHandler.getElementId();
elementId = recordTableHandler.getId();
}
if (elementId != null) {
recordTableHandlerManager.unregisterRecordTableHandler(elementId);
Expand All @@ -523,13 +524,13 @@ public synchronized void shutdown() {
table.shutdown();
}

for (EternalReferencedHolder eternalReferencedHolder : siddhiAppContext.getEternalReferencedHolders()) {
for (ExternalReferencedHolder externalReferencedHolder : siddhiAppContext.getExternalReferencedHolders()) {
try {
eternalReferencedHolder.stop();
externalReferencedHolder.stop();
} catch (Throwable t) {
log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(t, siddhiAppContext)) +
" Error while stopping EternalReferencedHolder '" +
StringUtil.removeCRLFCharacters(eternalReferencedHolder.toString()) + "' down Siddhi app '" +
" Error while stopping ExternalReferencedHolder '" +
StringUtil.removeCRLFCharacters(externalReferencedHolder.toString()) + "' down Siddhi app '" +
StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package io.siddhi.core.aggregation;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.MetaStateEvent;
Expand Down Expand Up @@ -68,61 +67,61 @@ public class AggregationRuntime implements MemoryCalculable {
private final AggregationDefinition aggregationDefinition;
private final Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMap;
private final Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMapForPartitions;
private final List<ExpressionExecutor> baseExecutorsForFind;
private ExpressionExecutor shouldUpdateTimestamp;
private final Map<TimePeriod.Duration, Table> aggregationTables;
private final SiddhiAppContext siddhiAppContext;
private final MetaStreamEvent tableMetaStreamEvent;
private final MetaStreamEvent aggregateMetaSteamEvent;
private final LatencyTracker latencyTrackerFind;
private final ThroughputTracker throughputTrackerFind;
private final List<List<ExpressionExecutor>> aggregateProcessingExecutorsList;
private final List<GroupByKeyGenerator> groupByKeyGeneratorList;
private List<TimePeriod.Duration> incrementalDurations;
private SingleStreamRuntime singleStreamRuntime;
private List<ExpressionExecutor> baseExecutors;
private List<ExpressionExecutor> outputExpressionExecutors;
private RecreateInMemoryData recreateInMemoryData;
private boolean processingOnExternalTime;
private boolean isFirstEventArrived;
private long lastExecutorsRefreshedTime = -1;
private IncrementalDataPurging incrementalDataPurging;
private ExpressionExecutor shouldUpdateExpressionExecutor;
private String shardId;
private List<List<ExpressionExecutor>> aggregateProcessExpressionExecutorsListForFind;

public AggregationRuntime(AggregationDefinition aggregationDefinition,
Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMap,
Map<TimePeriod.Duration, Table> aggregationTables,
SingleStreamRuntime singleStreamRuntime,
List<TimePeriod.Duration> incrementalDurations,
SiddhiAppContext siddhiAppContext, List<ExpressionExecutor> baseExecutors,
MetaStreamEvent tableMetaStreamEvent,
List<ExpressionExecutor> outputExpressionExecutors,
LatencyTracker latencyTrackerFind, ThroughputTracker throughputTrackerFind,
RecreateInMemoryData recreateInMemoryData, boolean processingOnExternalTime,
List<List<ExpressionExecutor>> aggregateProcessingExecutorsList,
List<GroupByKeyGenerator> groupByKeyGeneratorList,
IncrementalDataPurging incrementalDataPurging,
ExpressionExecutor shouldUpdateExpressionExecutor, String shardId,
Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMapForPartitions) {
String shardId,
Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMapForPartitions,
ExpressionExecutor shouldUpdateTimestamp,
List<List<ExpressionExecutor>> aggregateProcessExpressionExecutorsListForFind) {
this.aggregationDefinition = aggregationDefinition;
this.incrementalExecutorMap = incrementalExecutorMap;
this.aggregationTables = aggregationTables;
this.incrementalDurations = incrementalDurations;
this.siddhiAppContext = siddhiAppContext;
this.singleStreamRuntime = singleStreamRuntime;
this.baseExecutors = baseExecutors;
this.tableMetaStreamEvent = tableMetaStreamEvent;
this.outputExpressionExecutors = outputExpressionExecutors;
this.latencyTrackerFind = latencyTrackerFind;
this.throughputTrackerFind = throughputTrackerFind;
this.recreateInMemoryData = recreateInMemoryData;
this.processingOnExternalTime = processingOnExternalTime;
this.aggregateProcessingExecutorsList = aggregateProcessingExecutorsList;
this.groupByKeyGeneratorList = groupByKeyGeneratorList;
this.incrementalDataPurging = incrementalDataPurging;
this.shouldUpdateExpressionExecutor = shouldUpdateExpressionExecutor;
this.shardId = shardId;
this.incrementalExecutorMapForPartitions = incrementalExecutorMapForPartitions;
aggregateMetaSteamEvent = new MetaStreamEvent();
this.shouldUpdateTimestamp = shouldUpdateTimestamp;
this.aggregateProcessExpressionExecutorsListForFind = aggregateProcessExpressionExecutorsListForFind;
this.aggregateMetaSteamEvent = new MetaStreamEvent();
//Without timestamp executor
this.baseExecutorsForFind = aggregateProcessExpressionExecutorsListForFind.get(0).subList(1,
aggregateProcessExpressionExecutorsListForFind.get(0).size());
aggregationDefinition.getAttributeList().forEach(aggregateMetaSteamEvent::addOutputData);
}

Expand Down Expand Up @@ -196,11 +195,11 @@ public SingleStreamRuntime getSingleStreamRuntime() {
return singleStreamRuntime;
}

public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {

public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition,
SiddhiQueryContext siddhiQueryContext) {
try {
SnapshotService.getSkipSnapshotableThreadLocal().set(true);
if (latencyTrackerFind != null && siddhiAppContext.isStatsEnabled()) {
SnapshotService.getSkipStateStorageThreadLocal().set(true);
if (latencyTrackerFind != null && siddhiQueryContext.getSiddhiAppContext().isStatsEnabled()) {
latencyTrackerFind.markIn();
throughputTrackerFind.eventIn();
}
Expand All @@ -215,12 +214,13 @@ public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCond
}
return ((IncrementalAggregateCompileCondition) compiledCondition).find(matchingEvent,
aggregationDefinition, incrementalExecutorMap, aggregationTables, incrementalDurations,
baseExecutors, outputExpressionExecutors, siddhiAppContext,
aggregateProcessingExecutorsList, groupByKeyGeneratorList, shouldUpdateExpressionExecutor,
baseExecutorsForFind, outputExpressionExecutors, siddhiQueryContext,
aggregateProcessExpressionExecutorsListForFind,
groupByKeyGeneratorList, shouldUpdateTimestamp,
incrementalExecutorMapForPartitions);
} finally {
SnapshotService.getSkipSnapshotableThreadLocal().set(null);
if (latencyTrackerFind != null && siddhiAppContext.isStatsEnabled()) {
SnapshotService.getSkipStateStorageThreadLocal().set(null);
if (latencyTrackerFind != null && siddhiQueryContext.getSiddhiAppContext().isStatsEnabled()) {
latencyTrackerFind.markOut();
}
}
Expand Down Expand Up @@ -348,8 +348,7 @@ public CompiledCondition compileExpression(Expression expression, Within within,
// "on stream1.name == aggregator.nickName ..." in the join query) must be executed on that data.
// This condition is used for that purpose.
onCompiledCondition = OperatorParser.constructOperator(new ComplexEventChunk<>(true), expression,
matchingMetaInfoHolder, variableExpressionExecutors, tableMap,
siddhiQueryContext);
matchingMetaInfoHolder, variableExpressionExecutors, tableMap, siddhiQueryContext);

return new IncrementalAggregateCompileCondition(withinTableCompiledConditions, withinInMemoryCompileCondition,
onCompiledCondition, tableMetaStreamEvent, aggregateMetaSteamEvent, additionalAttributes,
Expand Down
Loading