Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose API to check whether the Siddhi App is stateful or not #1373 #1413

Merged
merged 3 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
<Bug pattern="VO_VOLATILE_INCREMENT"/>
</Match>
<Match>
<Class name="io.siddhi.core.partition.PartitionRuntime"/>
<Class name="io.siddhi.core.partition.PartitionRuntimeImpl"/>
<Bug pattern="RV_RETURN_VALUE_OF_PUTIFABSENT_IGNORED"/>
</Match>
<Match>
Expand Down
803 changes: 73 additions & 730 deletions modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntime.java

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class SiddhiAppContext {
private List<Scheduler> schedulerList;
private static final ThreadLocal<String> GROUP_BY_KEY = new ThreadLocal<>();
private static final ThreadLocal<String> PARTITION_KEY = new ThreadLocal<>();
private SiddhiApp siddhiApp;

public SiddhiAppContext() {
this.externalReferencedHolders = Collections.synchronizedList(new LinkedList<>());
Expand Down Expand Up @@ -264,6 +265,14 @@ public void setSiddhiAppString(String siddhiAppString) {
this.siddhiAppString = siddhiAppString;
}

public void setSiddhiApp(SiddhiApp siddhiApp) {
this.siddhiApp = siddhiApp;
}

public SiddhiApp getSiddhiApp() {
return siddhiApp;
}

public List<String> getIncludedMetrics() {
return includedMetrics;
}
Expand Down Expand Up @@ -311,4 +320,5 @@ public StateHolder generateStateHolder(String name, StateFactory stateFactory, b
return new EmptyStateHolder();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public class SiddhiQueryContext {
private boolean partitioned;
private OutputStream.OutputEventType outputEventType;
private LatencyTracker latencyTracker;
private Map<String, StateHolder> stateHolderMap;
private IdGenerator idGenerator;
private boolean stateful = false;

public SiddhiQueryContext(SiddhiAppContext siddhiAppContext, String queryName) {
this(siddhiAppContext, queryName, SiddhiConstants.PARTITION_ID_DEFAULT);
Expand Down Expand Up @@ -130,12 +130,18 @@ public StateHolder generateStateHolder(String name, boolean groupBy, StateFactor

if (SnapshotService.getSkipStateStorageThreadLocal().get() == null ||
!SnapshotService.getSkipStateStorageThreadLocal().get()) {
stateHolderMap = siddhiAppContext.getSnapshotService().getStateHolderMap(partitionId, this.getName());
Map<String, StateHolder> stateHolderMap =
siddhiAppContext.getSnapshotService().getStateHolderMap(partitionId, this.getName());
stateHolderMap.put(idGenerator.createNewId() + "-" + name, stateHolder);
}
stateful = true;
return stateHolder;
} else {
return new EmptyStateHolder();
}
}

public boolean isStateful() {
return stateful;
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.siddhi.core.event.stream.converter.StreamEventConverterFactory;
import io.siddhi.core.partition.executor.PartitionExecutor;
import io.siddhi.core.query.QueryRuntime;
import io.siddhi.core.query.QueryRuntimeImpl;
import io.siddhi.core.query.input.stream.StreamRuntime;
import io.siddhi.core.stream.StreamJunction;
import io.siddhi.query.api.definition.StreamDefinition;
Expand All @@ -48,7 +49,7 @@ public class PartitionStreamReceiver implements StreamJunction.Receiver {
private MetaStreamEvent metaStreamEvent;
private StreamDefinition streamDefinition;
private SiddhiAppContext siddhiAppContext;
private PartitionRuntime partitionRuntime;
private PartitionRuntimeImpl partitionRuntime;
private List<PartitionExecutor> partitionExecutors;
private Map<String, StreamJunction> streamJunctionMap = new HashMap<>();

Expand All @@ -59,7 +60,7 @@ public PartitionStreamReceiver(SiddhiAppContext siddhiAppContext, MetaStreamEven
PartitionRuntime partitionRuntime) {
this.metaStreamEvent = metaStreamEvent;
this.streamDefinition = streamDefinition;
this.partitionRuntime = partitionRuntime;
this.partitionRuntime = (PartitionRuntimeImpl) partitionRuntime;
this.partitionExecutors = partitionExecutors;
this.siddhiAppContext = siddhiAppContext;
this.streamId = streamDefinition.getId();
Expand Down Expand Up @@ -298,8 +299,8 @@ public void addStreamJunction(List<QueryRuntime> queryRuntimeList) {
streamJunctionMap.put(streamId, streamJunction);
}
for (QueryRuntime queryRuntime : queryRuntimeList) {
StreamRuntime streamRuntime = queryRuntime.getStreamRuntime();
for (int i = 0; i < queryRuntime.getInputStreamId().size(); i++) {
StreamRuntime streamRuntime = ((QueryRuntimeImpl) queryRuntime).getStreamRuntime();
for (int i = 0; i < ((QueryRuntimeImpl) queryRuntime).getInputStreamId().size(); i++) {
if ((streamRuntime.getSingleStreamRuntimes().get(i)).
getProcessStreamReceiver().getStreamId().equals(streamId)) {
streamJunction.subscribe((streamRuntime.getSingleStreamRuntimes().get(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,157 +17,16 @@
*/
package io.siddhi.core.query;

import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.MetaComplexEvent;
import io.siddhi.core.query.input.MultiProcessStreamReceiver;
import io.siddhi.core.query.input.stream.StreamRuntime;
import io.siddhi.core.query.input.stream.single.SingleStreamRuntime;
import io.siddhi.core.query.input.stream.state.StateStreamRuntime;
import io.siddhi.core.query.output.callback.OutputCallback;
import io.siddhi.core.query.output.callback.QueryCallback;
import io.siddhi.core.query.output.ratelimit.OutputRateLimiter;
import io.siddhi.core.query.selector.QuerySelector;
import io.siddhi.core.util.extension.holder.ExternalReferencedHolder;
import io.siddhi.core.util.statistics.MemoryCalculable;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.execution.query.Query;
import io.siddhi.query.api.execution.query.input.stream.JoinInputStream;
import io.siddhi.query.api.execution.query.input.stream.SingleInputStream;
import io.siddhi.query.api.execution.query.input.stream.StateInputStream;

import java.util.List;

/**
* Query Runtime represent holder object for a single Siddhi query and holds all runtime objects related to that query.
*/
public class QueryRuntime implements MemoryCalculable, ExternalReferencedHolder {

private StreamRuntime streamRuntime;
private OutputRateLimiter outputRateLimiter;
private Query query;
private OutputCallback outputCallback;
private SiddhiQueryContext siddhiQueryContext;
private StreamDefinition outputStreamDefinition;
private boolean toLocalStream;
private QuerySelector selector;
private MetaComplexEvent metaComplexEvent;

public QueryRuntime(Query query, StreamRuntime streamRuntime, QuerySelector selector,
OutputRateLimiter outputRateLimiter, OutputCallback outputCallback,
MetaComplexEvent metaComplexEvent,
SiddhiQueryContext siddhiQueryContext) {
this.query = query;
this.streamRuntime = streamRuntime;
this.selector = selector;
this.outputCallback = outputCallback;
this.siddhiQueryContext = siddhiQueryContext;
outputRateLimiter.setOutputCallback(outputCallback);
setOutputRateLimiter(outputRateLimiter);
setMetaComplexEvent(metaComplexEvent);
init();
}

public String getQueryId() {
return siddhiQueryContext.getName();
}

public void addCallback(QueryCallback callback) {
outputRateLimiter.addQueryCallback(callback);
}

public OutputRateLimiter getOutputRateManager() {
return outputRateLimiter;
}

public StreamDefinition getOutputStreamDefinition() {
return outputStreamDefinition;
}

public List<String> getInputStreamId() {
return query.getInputStream().getAllStreamIds();
}

public boolean isToLocalStream() {
return toLocalStream;
}

public void setToLocalStream(boolean toLocalStream) {
this.toLocalStream = toLocalStream;
}

public boolean isFromLocalStream() {
if (query.getInputStream() instanceof SingleInputStream) {
return ((SingleInputStream) query.getInputStream()).isInnerStream();
} else if (query.getInputStream() instanceof JoinInputStream) {
return ((SingleInputStream) ((JoinInputStream) query.getInputStream()).getLeftInputStream())
.isInnerStream() || ((SingleInputStream) ((JoinInputStream) query.getInputStream())
.getRightInputStream()).isInnerStream();
} else if (query.getInputStream() instanceof StateInputStream) {
for (String streamId : query.getInputStream().getAllStreamIds()) {
if (streamId.startsWith("#")) {
return true;
}
}
}
return false;
}

private void setOutputRateLimiter(OutputRateLimiter outputRateLimiter) {
this.outputRateLimiter = outputRateLimiter;
selector.setNextProcessor(outputRateLimiter);
}

public SiddhiQueryContext getSiddhiQueryContext() {
return siddhiQueryContext;
}

public StreamRuntime getStreamRuntime() {
return streamRuntime;
}

public MetaComplexEvent getMetaComplexEvent() {
return metaComplexEvent;
}

private void setMetaComplexEvent(MetaComplexEvent metaComplexEvent) {
outputStreamDefinition = metaComplexEvent.getOutputStreamDefinition();
this.metaComplexEvent = metaComplexEvent;
}

public Query getQuery() {
return query;
}

public OutputCallback getOutputCallback() {
return outputCallback;
}

public void init() {
streamRuntime.setCommonProcessor(selector);
for (SingleStreamRuntime singleStreamRuntime : streamRuntime.getSingleStreamRuntimes()) {
if (singleStreamRuntime.getProcessStreamReceiver() instanceof MultiProcessStreamReceiver) {
((MultiProcessStreamReceiver) singleStreamRuntime.getProcessStreamReceiver())
.setOutputRateLimiter(outputRateLimiter);
}
}
}

public QuerySelector getSelector() {
return selector;
}
public interface QueryRuntime {

public void initPartition() {
if (streamRuntime instanceof StateStreamRuntime) {
((StateStreamRuntime) streamRuntime).initPartition();
}
outputRateLimiter.partitionCreated();
}
String getQueryId();

public void start() {
initPartition();
}
boolean isStateful();

@Override
public void stop() {
}
Query getQuery();
}
Loading