diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index 977f66c05e..4b865b7264 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -97,7 +97,7 @@
-
+
diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntime.java
index ee8044b25a..4c4aa8969d 100644
--- a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntime.java
+++ b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntime.java
@@ -19,793 +19,153 @@
package io.siddhi.core;
import com.lmax.disruptor.ExceptionHandler;
-import io.siddhi.core.aggregation.AggregationRuntime;
-import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.debugger.SiddhiDebugger;
import io.siddhi.core.event.Event;
import io.siddhi.core.exception.CannotClearSiddhiAppStateException;
import io.siddhi.core.exception.CannotRestoreSiddhiAppStateException;
-import io.siddhi.core.exception.DefinitionNotExistException;
-import io.siddhi.core.exception.QueryNotExistException;
-import io.siddhi.core.exception.SiddhiAppRuntimeException;
-import io.siddhi.core.exception.StoreQueryCreationException;
import io.siddhi.core.partition.PartitionRuntime;
import io.siddhi.core.query.QueryRuntime;
-import io.siddhi.core.query.StoreQueryRuntime;
-import io.siddhi.core.query.input.stream.StreamRuntime;
-import io.siddhi.core.query.input.stream.single.SingleStreamRuntime;
-import io.siddhi.core.query.output.callback.OutputCallback;
import io.siddhi.core.query.output.callback.QueryCallback;
-import io.siddhi.core.stream.StreamJunction;
import io.siddhi.core.stream.input.InputHandler;
-import io.siddhi.core.stream.input.InputManager;
import io.siddhi.core.stream.input.source.Source;
-import io.siddhi.core.stream.input.source.SourceHandlerManager;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.core.stream.output.sink.Sink;
-import io.siddhi.core.stream.output.sink.SinkCallback;
-import io.siddhi.core.stream.output.sink.SinkHandlerManager;
import io.siddhi.core.table.Table;
-import io.siddhi.core.table.record.RecordTableHandler;
-import io.siddhi.core.table.record.RecordTableHandlerManager;
-import io.siddhi.core.util.ExceptionUtil;
-import io.siddhi.core.util.Scheduler;
-import io.siddhi.core.util.SiddhiConstants;
-import io.siddhi.core.util.StringUtil;
-import io.siddhi.core.util.extension.holder.ExternalReferencedHolder;
-import io.siddhi.core.util.parser.StoreQueryParser;
-import io.siddhi.core.util.parser.helper.QueryParserHelper;
-import io.siddhi.core.util.persistence.util.PersistenceHelper;
+import io.siddhi.core.trigger.Trigger;
import io.siddhi.core.util.snapshot.PersistenceReference;
-import io.siddhi.core.util.statistics.BufferedEventsTracker;
-import io.siddhi.core.util.statistics.LatencyTracker;
-import io.siddhi.core.util.statistics.MemoryUsageTracker;
import io.siddhi.core.util.statistics.metrics.Level;
import io.siddhi.core.window.Window;
+import io.siddhi.query.api.SiddhiApp;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.AggregationDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.definition.WindowDefinition;
-import io.siddhi.query.api.exception.SiddhiAppContextException;
import io.siddhi.query.api.execution.query.StoreQuery;
-import io.siddhi.query.compiler.SiddhiCompiler;
-import org.apache.log4j.Logger;
import java.beans.ExceptionListener;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
/**
* Keep streamDefinitions, partitionRuntimes, queryRuntimes of an SiddhiApp and streamJunctions and inputHandlers used.
*/
-public class SiddhiAppRuntime {
- private static final Logger log = Logger.getLogger(SiddhiAppRuntime.class);
- private final Map windowMap;
- private final Map> sourceMap;
- private final Map> sinkMap;
- private ConcurrentMap aggregationMap;
- private Map streamDefinitionMap =
- new ConcurrentHashMap(); // Contains stream definition.
- private Map tableDefinitionMap =
- new ConcurrentHashMap(); // Contains table definition.
- private Map windowDefinitionMap =
- new ConcurrentHashMap(); // Contains window definition.
- private Map aggregationDefinitionMap =
- new ConcurrentHashMap(); // Contains aggregation definition.
- private InputManager inputManager;
- private Map queryProcessorMap =
- Collections.synchronizedMap(new LinkedHashMap());
- private Map streamJunctionMap =
- new ConcurrentHashMap(); // Contains stream junctions.
- private Map tableMap = new ConcurrentHashMap(); // Contains event tables.
- private Map partitionMap =
- new ConcurrentHashMap(); // Contains partitions.
- private LinkedHashMap storeQueryRuntimeMap =
- new LinkedHashMap<>(); // Contains partitions.
- private SiddhiAppContext siddhiAppContext;
- private Map siddhiAppRuntimeMap;
- private MemoryUsageTracker memoryUsageTracker;
- private BufferedEventsTracker bufferedEventsTracker;
- private LatencyTracker storeQueryLatencyTracker;
- private SiddhiDebugger siddhiDebugger;
- private boolean running = false;
- private boolean runningWithoutSources = false;
- private Future futureIncrementalPersistor;
- private boolean incrementalDataPurging = true;
-
-
- public SiddhiAppRuntime(Map streamDefinitionMap,
- Map tableDefinitionMap,
- Map windowDefinitionMap,
- Map aggregationDefinitionMap,
- InputManager inputManager,
- Map queryProcessorMap,
- Map streamJunctionMap,
- Map tableMap,
- Map windowMap,
- ConcurrentMap aggregationMap,
- Map> sourceMap,
- Map> sinkMap,
- Map partitionMap,
- SiddhiAppContext siddhiAppContext,
- Map siddhiAppRuntimeMap) {
- this.streamDefinitionMap = streamDefinitionMap;
- this.tableDefinitionMap = tableDefinitionMap;
- this.windowDefinitionMap = windowDefinitionMap;
- this.aggregationDefinitionMap = aggregationDefinitionMap;
- this.inputManager = inputManager;
- this.queryProcessorMap = queryProcessorMap;
- this.streamJunctionMap = streamJunctionMap;
- this.tableMap = tableMap;
- this.windowMap = windowMap;
- this.aggregationMap = aggregationMap;
- this.sourceMap = sourceMap;
- this.sinkMap = sinkMap;
- this.partitionMap = partitionMap;
- this.siddhiAppContext = siddhiAppContext;
- this.siddhiAppRuntimeMap = siddhiAppRuntimeMap;
- if (siddhiAppContext.getStatisticsManager() != null) {
- monitorQueryMemoryUsage();
- monitorBufferedEvents();
- storeQueryLatencyTracker = QueryParserHelper.createLatencyTracker(siddhiAppContext, "query",
- SiddhiConstants.METRIC_INFIX_STORE_QUERIES, null);
- }
-
- for (Map.Entry> sinkEntries : sinkMap.entrySet()) {
- addCallback(sinkEntries.getKey(),
- new SinkCallback(sinkEntries.getValue(), streamDefinitionMap.get(sinkEntries.getKey())));
- }
- for (Map.Entry> sourceEntries : sourceMap.entrySet()) {
- InputHandler inputHandler = getInputHandler(sourceEntries.getKey());
- for (Source source : sourceEntries.getValue()) {
- source.getMapper().setInputHandler(inputHandler);
- }
- }
- }
-
- public String getName() {
- return siddhiAppContext.getName();
- }
+public interface SiddhiAppRuntime {
+
+
+ String getName();
+
+ SiddhiApp getSiddhiApp();
/**
* Get the stream definition map.
*
* @return Map of {@link StreamDefinition}s.
*/
- public Map getStreamDefinitionMap() {
- return streamDefinitionMap.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> (StreamDefinition) e.getValue()));
- }
+ Map getStreamDefinitionMap();
/**
* Get the table definition map.
*
* @return Map of {@link TableDefinition}s.
*/
- public Map getTableDefinitionMap() {
- return tableDefinitionMap.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> (TableDefinition) e.getValue()));
- }
+ Map getTableDefinitionMap();
/**
* Get the window definition map.
*
* @return Map of {@link WindowDefinition}s.
*/
- public Map getWindowDefinitionMap() {
- return windowDefinitionMap.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> (WindowDefinition) e.getValue()));
- }
+ Map getWindowDefinitionMap();
/**
* Get the aggregation definition map.
*
* @return Map of {@link AggregationDefinition}s.
*/
- public Map getAggregationDefinitionMap() {
- return aggregationDefinitionMap.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> (AggregationDefinition) e.getValue()));
- }
+ Map getAggregationDefinitionMap();
/**
* Get the names of the available queries.
*
* @return string set of query names.
*/
- public Set getQueryNames() {
- return queryProcessorMap.keySet();
- }
-
- public Map> getPartitionedInnerStreamDefinitionMap() {
- Map> innerStreams = new HashMap<>();
- for (PartitionRuntime partition : partitionMap.values()) {
- innerStreams.put(partition.getPartitionName(), partition.getLocalStreamDefinitionMap());
- }
- return innerStreams;
- }
-
- public void addCallback(String streamId, StreamCallback streamCallback) {
- streamCallback.setStreamId(streamId);
- StreamJunction streamJunction = streamJunctionMap.get(streamId);
- if (streamJunction == null) {
- throw new DefinitionNotExistException("No stream found with name: " + streamId);
- }
- streamCallback.setStreamDefinition(streamDefinitionMap.get(streamId));
- streamCallback.setContext(siddhiAppContext);
- streamJunction.subscribe(streamCallback);
- }
-
- public void addCallback(String queryName, QueryCallback callback) {
- callback.setContext(siddhiAppContext);
- QueryRuntime queryRuntime = queryProcessorMap.get(queryName);
- if (queryRuntime == null) {
- throw new QueryNotExistException("No query found with name: " + queryName);
- }
- callback.setQuery(queryRuntime.getQuery());
- queryRuntime.addCallback(callback);
- }
-
- public Event[] query(String storeQuery) {
- return query(SiddhiCompiler.parseStoreQuery(storeQuery), storeQuery);
- }
-
- public Event[] query(StoreQuery storeQuery) {
- return query(storeQuery, null);
- }
-
- private Event[] query(StoreQuery storeQuery, String storeQueryString) {
- try {
- if (Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0 &&
- storeQueryLatencyTracker != null) {
- storeQueryLatencyTracker.markIn();
- }
- StoreQueryRuntime storeQueryRuntime;
- synchronized (this) {
- storeQueryRuntime = storeQueryRuntimeMap.remove(storeQuery);
- if (storeQueryRuntime == null) {
- storeQueryRuntime = StoreQueryParser.parse(storeQuery, siddhiAppContext, tableMap, windowMap,
- aggregationMap);
- } else {
- storeQueryRuntime.reset();
- }
- storeQueryRuntimeMap.put(storeQuery, storeQueryRuntime);
- if (storeQueryRuntimeMap.size() > 50) {
- Iterator i = storeQueryRuntimeMap.entrySet().iterator();
- if (i.hasNext()) {
- i.next();
- i.remove();
- }
- }
- }
- return storeQueryRuntime.execute();
- } catch (RuntimeException e) {
- if (e instanceof SiddhiAppContextException) {
- throw new StoreQueryCreationException(((SiddhiAppContextException) e).getMessageWithOutContext(), e,
- ((SiddhiAppContextException) e).getQueryContextStartIndex(),
- ((SiddhiAppContextException) e).getQueryContextEndIndex(), null, storeQueryString);
- }
- throw new StoreQueryCreationException(e.getMessage(), e);
- } finally {
- if (Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0 &&
- storeQueryLatencyTracker != null) {
- storeQueryLatencyTracker.markOut();
- }
- }
- }
-
- public Attribute[] getStoreQueryOutputAttributes(String storeQuery) {
- return getStoreQueryOutputAttributes(SiddhiCompiler.parseStoreQuery(storeQuery), storeQuery);
- }
-
- public Attribute[] getStoreQueryOutputAttributes(StoreQuery storeQuery) {
- return getStoreQueryOutputAttributes(storeQuery, null);
- }
+ Set getQueryNames();
+ Map> getPartitionedInnerStreamDefinitionMap();
- /**
- * This method get the storeQuery and return the corresponding output and its types.
- *
- * @param storeQuery this storeQuery is processed and get the output attributes.
- * @param storeQueryString this passed to report errors with context if there are any.
- * @return List of output attributes
- */
- private Attribute[] getStoreQueryOutputAttributes(StoreQuery storeQuery, String storeQueryString) {
- try {
- StoreQueryRuntime storeQueryRuntime = storeQueryRuntimeMap.get(storeQuery);
- if (storeQueryRuntime == null) {
- storeQueryRuntime = StoreQueryParser.parse(storeQuery, siddhiAppContext, tableMap, windowMap,
- aggregationMap);
- storeQueryRuntimeMap.put(storeQuery, storeQueryRuntime);
- }
- return storeQueryRuntime.getStoreQueryOutputAttributes();
- } catch (RuntimeException e) {
- if (e instanceof SiddhiAppContextException) {
- throw new StoreQueryCreationException(((SiddhiAppContextException) e).getMessageWithOutContext(), e,
- ((SiddhiAppContextException) e).getQueryContextStartIndex(),
- ((SiddhiAppContextException) e).getQueryContextEndIndex(), null, siddhiAppContext
- .getSiddhiAppString());
- }
- throw new StoreQueryCreationException(e.getMessage(), e);
- }
- }
-
- public InputHandler getInputHandler(String streamId) {
- return inputManager.getInputHandler(streamId);
- }
-
- public Collection> getSources() {
- return sourceMap.values();
- }
-
- public Collection> getSinks() {
- return sinkMap.values();
- }
-
- public Collection getTables() {
- return tableMap.values();
- }
-
- public synchronized void start() {
- if (running) {
- log.warn("Error calling start() for Siddhi App '" + siddhiAppContext.getName() + "', " +
- "SiddhiApp already started.");
- return;
- }
- if (!runningWithoutSources) {
- startWithoutSources();
- }
- if (runningWithoutSources) {
- startSources();
- }
- }
-
- public synchronized void startWithoutSources() {
- if (running || runningWithoutSources) {
- log.warn("Error calling startWithoutSources() for Siddhi App '" + siddhiAppContext.getName() + "', " +
- "SiddhiApp already started.");
- } else {
- try {
- memoryUsageTracker.disableMemoryUsageMetrics();
- if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.OFF) != 0 &&
- siddhiAppContext.getStatisticsManager() != null) {
- if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.DETAIL) == 0) {
- memoryUsageTracker.enableMemoryUsageMetrics();
- }
- siddhiAppContext.getStatisticsManager().startReporting();
- }
- for (ExternalReferencedHolder externalReferencedHolder :
- siddhiAppContext.getExternalReferencedHolders()) {
- externalReferencedHolder.start();
- }
- for (List sinks : sinkMap.values()) {
- for (Sink sink : sinks) {
- sink.connectWithRetry();
- }
- }
- for (Table table : tableMap.values()) {
- table.connectWithRetry();
- }
- for (StreamJunction streamJunction : streamJunctionMap.values()) {
- streamJunction.startProcessing();
- }
- if (incrementalDataPurging) {
- for (AggregationRuntime aggregationRuntime : aggregationMap.values()) {
- aggregationRuntime.startPurging();
- }
- }
- runningWithoutSources = true;
- } catch (Throwable t) {
- log.error("Error starting Siddhi App '" + siddhiAppContext.getName() + "', " +
- "triggering shutdown process. " + t.getMessage());
- try {
- shutdown();
- } catch (Throwable t1) {
- log.error("Error shutting down partially started Siddhi App '" + siddhiAppContext.getName() + "', "
- + t1.getMessage());
- }
- }
- }
- }
-
- public void setPurgingEnabled(boolean purgingEnabled) {
- this.incrementalDataPurging = purgingEnabled;
- }
-
- public void startSources() {
- if (running) {
- log.warn("Error calling startSources() for Siddhi App '" + siddhiAppContext.getName() + "', " +
- "SiddhiApp already started with the sources.");
- return;
- }
- if (!runningWithoutSources) {
- throw new SiddhiAppRuntimeException("Cannot call startSources() without calling startWithoutSources() " +
- "for Siddhi App '" + siddhiAppContext.getName() + "'");
- } else {
- try {
- for (List sources : sourceMap.values()) {
- for (Source source : sources) {
- source.connectWithRetry();
- }
- }
- running = true;
- runningWithoutSources = false;
- } catch (Throwable t) {
- log.error("Error starting Siddhi App '" + siddhiAppContext.getName() + "', " +
- "triggering shutdown process. " + t.getMessage());
- try {
- shutdown();
- } catch (Throwable t1) {
- log.error("Error shutting down partially started Siddhi App '" + siddhiAppContext.getName() + "', "
- + t1.getMessage());
- }
- }
- }
- }
-
- public synchronized void shutdown() {
- SourceHandlerManager sourceHandlerManager = siddhiAppContext.getSiddhiContext().getSourceHandlerManager();
- for (List sources : sourceMap.values()) {
- for (Source source : sources) {
- try {
- if (sourceHandlerManager != null) {
- sourceHandlerManager.unregisterSourceHandler(source.getMapper().getHandler().
- getId());
- }
- source.shutdown();
- } catch (Throwable t) {
- log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext
- (t, siddhiAppContext)) + " Error in shutting down source '" + StringUtil.
- removeCRLFCharacters(source.getType()) + "' at '" +
- StringUtil.removeCRLFCharacters(source.getStreamDefinition().getId()) +
- "' on Siddhi App '" + siddhiAppContext.getName() + "'.", t);
- }
- }
- }
-
- for (Table table : tableMap.values()) {
- try {
- table.shutdown();
- } catch (Throwable t) {
- log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(t, siddhiAppContext)) +
- " Error in shutting down table '" +
- StringUtil.removeCRLFCharacters(table.getTableDefinition().getId()) + "' on Siddhi App '" +
- StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t);
- }
- }
-
- SinkHandlerManager sinkHandlerManager = siddhiAppContext.getSiddhiContext().getSinkHandlerManager();
- for (List sinks : sinkMap.values()) {
- for (Sink sink : sinks) {
- try {
- if (sinkHandlerManager != null) {
- sinkHandlerManager.unregisterSinkHandler(sink.getHandler().getId());
- }
- sink.shutdown();
- } catch (Throwable t) {
- log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext
- (t, siddhiAppContext)) + " Error in shutting down sink '" + StringUtil.
- removeCRLFCharacters(sink.getType()) + "' at '" + StringUtil.removeCRLFCharacters(sink.
- getStreamDefinition().getId()) + "' on Siddhi App '" +
- StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t);
- }
- }
- }
-
- for (Table table : tableMap.values()) {
- RecordTableHandlerManager recordTableHandlerManager = siddhiAppContext.getSiddhiContext().
- getRecordTableHandlerManager();
- if (recordTableHandlerManager != null) {
- String elementId = null;
- RecordTableHandler recordTableHandler = table.getHandler();
- if (recordTableHandler != null) {
- elementId = recordTableHandler.getId();
- }
- if (elementId != null) {
- recordTableHandlerManager.unregisterRecordTableHandler(elementId);
- }
- }
- table.shutdown();
- }
-
- for (ExternalReferencedHolder externalReferencedHolder : siddhiAppContext.getExternalReferencedHolders()) {
- try {
- externalReferencedHolder.stop();
- } catch (Throwable t) {
- log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(t, siddhiAppContext)) +
- " Error while stopping ExternalReferencedHolder '" +
- StringUtil.removeCRLFCharacters(externalReferencedHolder.toString()) + "' down Siddhi app '" +
- StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t);
- }
- }
- inputManager.disconnect();
-
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
-
- }
- for (StreamJunction streamJunction : streamJunctionMap.values()) {
- streamJunction.stopProcessing();
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
-
- }
- siddhiAppContext.getScheduledExecutorService().shutdownNow();
- siddhiAppContext.getExecutorService().shutdownNow();
-
- }
- }, "Siddhi-SiddhiApp-" + siddhiAppContext.getName() + "-Shutdown-Cleaner");
- thread.start();
-
- if (siddhiAppRuntimeMap != null) {
- siddhiAppRuntimeMap.remove(siddhiAppContext.getName());
- }
-
- if (siddhiAppContext.getStatisticsManager() != null) {
- if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.OFF) != 0) {
- siddhiAppContext.getStatisticsManager().stopReporting();
- }
- siddhiAppContext.getStatisticsManager().cleanup();
- }
- running = false;
- runningWithoutSources = false;
- }
-
- public synchronized SiddhiDebugger debug() {
- siddhiDebugger = new SiddhiDebugger(siddhiAppContext);
- List streamRuntime = new ArrayList<>();
- List streamCallbacks = new ArrayList<>();
- for (QueryRuntime queryRuntime : queryProcessorMap.values()) {
- streamRuntime.add(queryRuntime.getStreamRuntime());
- streamCallbacks.add(queryRuntime.getOutputCallback());
- }
- for (StreamRuntime streamRuntime1 : streamRuntime) {
- for (SingleStreamRuntime singleStreamRuntime : streamRuntime1.getSingleStreamRuntimes()) {
- singleStreamRuntime.getProcessStreamReceiver().setSiddhiDebugger(siddhiDebugger);
- }
- }
- for (OutputCallback callback : streamCallbacks) {
- callback.setSiddhiDebugger(siddhiDebugger);
- }
- start();
- return siddhiDebugger;
- }
-
- public PersistenceReference persist() {
- try {
- // first, pause all the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::pause));
- // take snapshots of execution units
- if (siddhiAppContext.getSiddhiContext().getPersistenceStore() != null) {
- return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().fullSnapshot(),
- siddhiAppContext);
- } else {
- return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().incrementalSnapshot(),
- siddhiAppContext);
- }
- } finally {
- // at the end, resume the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::resume));
- }
- }
-
- public byte[] snapshot() {
- try {
- // first, pause all the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::pause));
- // take snapshots of execution units
- return siddhiAppContext.getSnapshotService().fullSnapshot();
- } finally {
- // at the end, resume the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::resume));
- }
- }
-
- public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException {
- try {
- // first, pause all the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::pause));
- // start the restoring process
- siddhiAppContext.getSnapshotService().restore(snapshot);
- } finally {
- // at the end, resume the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::resume));
- }
- }
-
- public void restoreRevision(String revision) throws CannotRestoreSiddhiAppStateException {
- try {
- // first, pause all the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::pause));
- // start the restoring process
- siddhiAppContext.getSnapshotService().restoreRevision(revision);
- } finally {
- // at the end, resume the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::resume));
- }
- }
-
- public String restoreLastRevision() throws CannotRestoreSiddhiAppStateException {
- String revision;
- try {
- // first, pause all the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::pause));
- // start the restoring process
- revision = siddhiAppContext.getSnapshotService().restoreLastRevision();
- } finally {
- // at the end, resume the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::resume));
- }
- return revision;
- }
-
- public void clearAllRevisions() throws CannotClearSiddhiAppStateException {
- try {
- // first, pause all the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::pause));
- // start the restoring process
- siddhiAppContext.getSnapshotService().clearAllRevisions();
- } finally {
- // at the end, resume the event sources
- sourceMap.values().forEach(list -> list.forEach(Source::resume));
- }
- }
-
- private void monitorQueryMemoryUsage() {
- memoryUsageTracker = siddhiAppContext
- .getSiddhiContext()
- .getStatisticsConfiguration()
- .getFactory()
- .createMemoryUsageTracker(siddhiAppContext.getStatisticsManager());
- for (Map.Entry entry : queryProcessorMap.entrySet()) {
- QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(),
- SiddhiConstants.METRIC_INFIX_QUERIES, siddhiAppContext, memoryUsageTracker);
- }
- for (PartitionRuntime partitionRuntime : partitionMap.values()) {
- partitionRuntime.setMemoryUsageTracker(memoryUsageTracker);
- }
- for (Map.Entry entry : tableMap.entrySet()) {
- QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(),
- SiddhiConstants.METRIC_INFIX_TABLES, siddhiAppContext, memoryUsageTracker);
- }
- for (Map.Entry entry : windowMap.entrySet()) {
- QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(),
- SiddhiConstants.METRIC_INFIX_WINDOWS, siddhiAppContext, memoryUsageTracker);
- }
- for (Map.Entry entry : aggregationMap.entrySet()) {
- QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(),
- SiddhiConstants.METRIC_INFIX_AGGREGATIONS, siddhiAppContext, memoryUsageTracker);
- }
- }
-
- private void monitorBufferedEvents() {
- bufferedEventsTracker = siddhiAppContext
- .getSiddhiContext()
- .getStatisticsConfiguration()
- .getFactory()
- .createBufferSizeTracker(siddhiAppContext.getStatisticsManager());
- for (Map.Entry entry : streamJunctionMap.entrySet()) {
- registerForBufferedEvents(entry);
- }
- for (Map.Entry entry : partitionMap.entrySet()) {
- ConcurrentMap streamJunctionMap = ((PartitionRuntime) entry.getValue())
- .getLocalStreamJunctionMap();
- for (Map.Entry streamJunctionEntry : streamJunctionMap.entrySet()) {
- registerForBufferedEvents(streamJunctionEntry);
- }
- }
- }
-
- private void registerForBufferedEvents(Map.Entry entry) {
- if (entry.getValue().containsBufferedEvents()) {
- String metricName = siddhiAppContext.getSiddhiContext().getStatisticsConfiguration().getMetricPrefix() +
- SiddhiConstants.METRIC_DELIMITER + SiddhiConstants.METRIC_INFIX_SIDDHI_APPS +
- SiddhiConstants.METRIC_DELIMITER + getName() + SiddhiConstants.METRIC_DELIMITER +
- SiddhiConstants.METRIC_INFIX_SIDDHI + SiddhiConstants.METRIC_DELIMITER +
- SiddhiConstants.METRIC_INFIX_STREAMS + SiddhiConstants.METRIC_DELIMITER +
- entry.getKey() + SiddhiConstants.METRIC_DELIMITER + "size";
- boolean matchExist = false;
- for (String regex : siddhiAppContext.getIncludedMetrics()) {
- if (metricName.matches(regex)) {
- matchExist = true;
- break;
- }
- }
- if (matchExist) {
- bufferedEventsTracker.registerEventBufferHolder(entry.getValue(), metricName);
- }
- }
- }
-
- public void handleExceptionWith(ExceptionHandler