diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml index 977f66c05e..4b865b7264 100644 --- a/findbugs-exclude.xml +++ b/findbugs-exclude.xml @@ -97,7 +97,7 @@ - + diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntime.java index ee8044b25a..4c4aa8969d 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntime.java @@ -19,793 +19,153 @@ package io.siddhi.core; import com.lmax.disruptor.ExceptionHandler; -import io.siddhi.core.aggregation.AggregationRuntime; -import io.siddhi.core.config.SiddhiAppContext; import io.siddhi.core.debugger.SiddhiDebugger; import io.siddhi.core.event.Event; import io.siddhi.core.exception.CannotClearSiddhiAppStateException; import io.siddhi.core.exception.CannotRestoreSiddhiAppStateException; -import io.siddhi.core.exception.DefinitionNotExistException; -import io.siddhi.core.exception.QueryNotExistException; -import io.siddhi.core.exception.SiddhiAppRuntimeException; -import io.siddhi.core.exception.StoreQueryCreationException; import io.siddhi.core.partition.PartitionRuntime; import io.siddhi.core.query.QueryRuntime; -import io.siddhi.core.query.StoreQueryRuntime; -import io.siddhi.core.query.input.stream.StreamRuntime; -import io.siddhi.core.query.input.stream.single.SingleStreamRuntime; -import io.siddhi.core.query.output.callback.OutputCallback; import io.siddhi.core.query.output.callback.QueryCallback; -import io.siddhi.core.stream.StreamJunction; import io.siddhi.core.stream.input.InputHandler; -import io.siddhi.core.stream.input.InputManager; import io.siddhi.core.stream.input.source.Source; -import io.siddhi.core.stream.input.source.SourceHandlerManager; import io.siddhi.core.stream.output.StreamCallback; import io.siddhi.core.stream.output.sink.Sink; -import io.siddhi.core.stream.output.sink.SinkCallback; -import io.siddhi.core.stream.output.sink.SinkHandlerManager; import io.siddhi.core.table.Table; -import io.siddhi.core.table.record.RecordTableHandler; -import io.siddhi.core.table.record.RecordTableHandlerManager; -import io.siddhi.core.util.ExceptionUtil; -import io.siddhi.core.util.Scheduler; -import io.siddhi.core.util.SiddhiConstants; -import io.siddhi.core.util.StringUtil; -import io.siddhi.core.util.extension.holder.ExternalReferencedHolder; -import io.siddhi.core.util.parser.StoreQueryParser; -import io.siddhi.core.util.parser.helper.QueryParserHelper; -import io.siddhi.core.util.persistence.util.PersistenceHelper; +import io.siddhi.core.trigger.Trigger; import io.siddhi.core.util.snapshot.PersistenceReference; -import io.siddhi.core.util.statistics.BufferedEventsTracker; -import io.siddhi.core.util.statistics.LatencyTracker; -import io.siddhi.core.util.statistics.MemoryUsageTracker; import io.siddhi.core.util.statistics.metrics.Level; import io.siddhi.core.window.Window; +import io.siddhi.query.api.SiddhiApp; import io.siddhi.query.api.definition.AbstractDefinition; import io.siddhi.query.api.definition.AggregationDefinition; import io.siddhi.query.api.definition.Attribute; import io.siddhi.query.api.definition.StreamDefinition; import io.siddhi.query.api.definition.TableDefinition; import io.siddhi.query.api.definition.WindowDefinition; -import io.siddhi.query.api.exception.SiddhiAppContextException; import io.siddhi.query.api.execution.query.StoreQuery; -import io.siddhi.query.compiler.SiddhiCompiler; -import org.apache.log4j.Logger; import java.beans.ExceptionListener; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; -import java.util.stream.Collectors; /** * Keep streamDefinitions, partitionRuntimes, queryRuntimes of an SiddhiApp and streamJunctions and inputHandlers used. */ -public class SiddhiAppRuntime { - private static final Logger log = Logger.getLogger(SiddhiAppRuntime.class); - private final Map windowMap; - private final Map> sourceMap; - private final Map> sinkMap; - private ConcurrentMap aggregationMap; - private Map streamDefinitionMap = - new ConcurrentHashMap(); // Contains stream definition. - private Map tableDefinitionMap = - new ConcurrentHashMap(); // Contains table definition. - private Map windowDefinitionMap = - new ConcurrentHashMap(); // Contains window definition. - private Map aggregationDefinitionMap = - new ConcurrentHashMap(); // Contains aggregation definition. - private InputManager inputManager; - private Map queryProcessorMap = - Collections.synchronizedMap(new LinkedHashMap()); - private Map streamJunctionMap = - new ConcurrentHashMap(); // Contains stream junctions. - private Map tableMap = new ConcurrentHashMap(); // Contains event tables. - private Map partitionMap = - new ConcurrentHashMap(); // Contains partitions. - private LinkedHashMap storeQueryRuntimeMap = - new LinkedHashMap<>(); // Contains partitions. - private SiddhiAppContext siddhiAppContext; - private Map siddhiAppRuntimeMap; - private MemoryUsageTracker memoryUsageTracker; - private BufferedEventsTracker bufferedEventsTracker; - private LatencyTracker storeQueryLatencyTracker; - private SiddhiDebugger siddhiDebugger; - private boolean running = false; - private boolean runningWithoutSources = false; - private Future futureIncrementalPersistor; - private boolean incrementalDataPurging = true; - - - public SiddhiAppRuntime(Map streamDefinitionMap, - Map tableDefinitionMap, - Map windowDefinitionMap, - Map aggregationDefinitionMap, - InputManager inputManager, - Map queryProcessorMap, - Map streamJunctionMap, - Map tableMap, - Map windowMap, - ConcurrentMap aggregationMap, - Map> sourceMap, - Map> sinkMap, - Map partitionMap, - SiddhiAppContext siddhiAppContext, - Map siddhiAppRuntimeMap) { - this.streamDefinitionMap = streamDefinitionMap; - this.tableDefinitionMap = tableDefinitionMap; - this.windowDefinitionMap = windowDefinitionMap; - this.aggregationDefinitionMap = aggregationDefinitionMap; - this.inputManager = inputManager; - this.queryProcessorMap = queryProcessorMap; - this.streamJunctionMap = streamJunctionMap; - this.tableMap = tableMap; - this.windowMap = windowMap; - this.aggregationMap = aggregationMap; - this.sourceMap = sourceMap; - this.sinkMap = sinkMap; - this.partitionMap = partitionMap; - this.siddhiAppContext = siddhiAppContext; - this.siddhiAppRuntimeMap = siddhiAppRuntimeMap; - if (siddhiAppContext.getStatisticsManager() != null) { - monitorQueryMemoryUsage(); - monitorBufferedEvents(); - storeQueryLatencyTracker = QueryParserHelper.createLatencyTracker(siddhiAppContext, "query", - SiddhiConstants.METRIC_INFIX_STORE_QUERIES, null); - } - - for (Map.Entry> sinkEntries : sinkMap.entrySet()) { - addCallback(sinkEntries.getKey(), - new SinkCallback(sinkEntries.getValue(), streamDefinitionMap.get(sinkEntries.getKey()))); - } - for (Map.Entry> sourceEntries : sourceMap.entrySet()) { - InputHandler inputHandler = getInputHandler(sourceEntries.getKey()); - for (Source source : sourceEntries.getValue()) { - source.getMapper().setInputHandler(inputHandler); - } - } - } - - public String getName() { - return siddhiAppContext.getName(); - } +public interface SiddhiAppRuntime { + + + String getName(); + + SiddhiApp getSiddhiApp(); /** * Get the stream definition map. * * @return Map of {@link StreamDefinition}s. */ - public Map getStreamDefinitionMap() { - return streamDefinitionMap.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> (StreamDefinition) e.getValue())); - } + Map getStreamDefinitionMap(); /** * Get the table definition map. * * @return Map of {@link TableDefinition}s. */ - public Map getTableDefinitionMap() { - return tableDefinitionMap.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> (TableDefinition) e.getValue())); - } + Map getTableDefinitionMap(); /** * Get the window definition map. * * @return Map of {@link WindowDefinition}s. */ - public Map getWindowDefinitionMap() { - return windowDefinitionMap.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> (WindowDefinition) e.getValue())); - } + Map getWindowDefinitionMap(); /** * Get the aggregation definition map. * * @return Map of {@link AggregationDefinition}s. */ - public Map getAggregationDefinitionMap() { - return aggregationDefinitionMap.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> (AggregationDefinition) e.getValue())); - } + Map getAggregationDefinitionMap(); /** * Get the names of the available queries. * * @return string set of query names. */ - public Set getQueryNames() { - return queryProcessorMap.keySet(); - } - - public Map> getPartitionedInnerStreamDefinitionMap() { - Map> innerStreams = new HashMap<>(); - for (PartitionRuntime partition : partitionMap.values()) { - innerStreams.put(partition.getPartitionName(), partition.getLocalStreamDefinitionMap()); - } - return innerStreams; - } - - public void addCallback(String streamId, StreamCallback streamCallback) { - streamCallback.setStreamId(streamId); - StreamJunction streamJunction = streamJunctionMap.get(streamId); - if (streamJunction == null) { - throw new DefinitionNotExistException("No stream found with name: " + streamId); - } - streamCallback.setStreamDefinition(streamDefinitionMap.get(streamId)); - streamCallback.setContext(siddhiAppContext); - streamJunction.subscribe(streamCallback); - } - - public void addCallback(String queryName, QueryCallback callback) { - callback.setContext(siddhiAppContext); - QueryRuntime queryRuntime = queryProcessorMap.get(queryName); - if (queryRuntime == null) { - throw new QueryNotExistException("No query found with name: " + queryName); - } - callback.setQuery(queryRuntime.getQuery()); - queryRuntime.addCallback(callback); - } - - public Event[] query(String storeQuery) { - return query(SiddhiCompiler.parseStoreQuery(storeQuery), storeQuery); - } - - public Event[] query(StoreQuery storeQuery) { - return query(storeQuery, null); - } - - private Event[] query(StoreQuery storeQuery, String storeQueryString) { - try { - if (Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0 && - storeQueryLatencyTracker != null) { - storeQueryLatencyTracker.markIn(); - } - StoreQueryRuntime storeQueryRuntime; - synchronized (this) { - storeQueryRuntime = storeQueryRuntimeMap.remove(storeQuery); - if (storeQueryRuntime == null) { - storeQueryRuntime = StoreQueryParser.parse(storeQuery, siddhiAppContext, tableMap, windowMap, - aggregationMap); - } else { - storeQueryRuntime.reset(); - } - storeQueryRuntimeMap.put(storeQuery, storeQueryRuntime); - if (storeQueryRuntimeMap.size() > 50) { - Iterator i = storeQueryRuntimeMap.entrySet().iterator(); - if (i.hasNext()) { - i.next(); - i.remove(); - } - } - } - return storeQueryRuntime.execute(); - } catch (RuntimeException e) { - if (e instanceof SiddhiAppContextException) { - throw new StoreQueryCreationException(((SiddhiAppContextException) e).getMessageWithOutContext(), e, - ((SiddhiAppContextException) e).getQueryContextStartIndex(), - ((SiddhiAppContextException) e).getQueryContextEndIndex(), null, storeQueryString); - } - throw new StoreQueryCreationException(e.getMessage(), e); - } finally { - if (Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0 && - storeQueryLatencyTracker != null) { - storeQueryLatencyTracker.markOut(); - } - } - } - - public Attribute[] getStoreQueryOutputAttributes(String storeQuery) { - return getStoreQueryOutputAttributes(SiddhiCompiler.parseStoreQuery(storeQuery), storeQuery); - } - - public Attribute[] getStoreQueryOutputAttributes(StoreQuery storeQuery) { - return getStoreQueryOutputAttributes(storeQuery, null); - } + Set getQueryNames(); + Map> getPartitionedInnerStreamDefinitionMap(); - /** - * This method get the storeQuery and return the corresponding output and its types. - * - * @param storeQuery this storeQuery is processed and get the output attributes. - * @param storeQueryString this passed to report errors with context if there are any. - * @return List of output attributes - */ - private Attribute[] getStoreQueryOutputAttributes(StoreQuery storeQuery, String storeQueryString) { - try { - StoreQueryRuntime storeQueryRuntime = storeQueryRuntimeMap.get(storeQuery); - if (storeQueryRuntime == null) { - storeQueryRuntime = StoreQueryParser.parse(storeQuery, siddhiAppContext, tableMap, windowMap, - aggregationMap); - storeQueryRuntimeMap.put(storeQuery, storeQueryRuntime); - } - return storeQueryRuntime.getStoreQueryOutputAttributes(); - } catch (RuntimeException e) { - if (e instanceof SiddhiAppContextException) { - throw new StoreQueryCreationException(((SiddhiAppContextException) e).getMessageWithOutContext(), e, - ((SiddhiAppContextException) e).getQueryContextStartIndex(), - ((SiddhiAppContextException) e).getQueryContextEndIndex(), null, siddhiAppContext - .getSiddhiAppString()); - } - throw new StoreQueryCreationException(e.getMessage(), e); - } - } - - public InputHandler getInputHandler(String streamId) { - return inputManager.getInputHandler(streamId); - } - - public Collection> getSources() { - return sourceMap.values(); - } - - public Collection> getSinks() { - return sinkMap.values(); - } - - public Collection getTables() { - return tableMap.values(); - } - - public synchronized void start() { - if (running) { - log.warn("Error calling start() for Siddhi App '" + siddhiAppContext.getName() + "', " + - "SiddhiApp already started."); - return; - } - if (!runningWithoutSources) { - startWithoutSources(); - } - if (runningWithoutSources) { - startSources(); - } - } - - public synchronized void startWithoutSources() { - if (running || runningWithoutSources) { - log.warn("Error calling startWithoutSources() for Siddhi App '" + siddhiAppContext.getName() + "', " + - "SiddhiApp already started."); - } else { - try { - memoryUsageTracker.disableMemoryUsageMetrics(); - if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.OFF) != 0 && - siddhiAppContext.getStatisticsManager() != null) { - if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.DETAIL) == 0) { - memoryUsageTracker.enableMemoryUsageMetrics(); - } - siddhiAppContext.getStatisticsManager().startReporting(); - } - for (ExternalReferencedHolder externalReferencedHolder : - siddhiAppContext.getExternalReferencedHolders()) { - externalReferencedHolder.start(); - } - for (List sinks : sinkMap.values()) { - for (Sink sink : sinks) { - sink.connectWithRetry(); - } - } - for (Table table : tableMap.values()) { - table.connectWithRetry(); - } - for (StreamJunction streamJunction : streamJunctionMap.values()) { - streamJunction.startProcessing(); - } - if (incrementalDataPurging) { - for (AggregationRuntime aggregationRuntime : aggregationMap.values()) { - aggregationRuntime.startPurging(); - } - } - runningWithoutSources = true; - } catch (Throwable t) { - log.error("Error starting Siddhi App '" + siddhiAppContext.getName() + "', " + - "triggering shutdown process. " + t.getMessage()); - try { - shutdown(); - } catch (Throwable t1) { - log.error("Error shutting down partially started Siddhi App '" + siddhiAppContext.getName() + "', " - + t1.getMessage()); - } - } - } - } - - public void setPurgingEnabled(boolean purgingEnabled) { - this.incrementalDataPurging = purgingEnabled; - } - - public void startSources() { - if (running) { - log.warn("Error calling startSources() for Siddhi App '" + siddhiAppContext.getName() + "', " + - "SiddhiApp already started with the sources."); - return; - } - if (!runningWithoutSources) { - throw new SiddhiAppRuntimeException("Cannot call startSources() without calling startWithoutSources() " + - "for Siddhi App '" + siddhiAppContext.getName() + "'"); - } else { - try { - for (List sources : sourceMap.values()) { - for (Source source : sources) { - source.connectWithRetry(); - } - } - running = true; - runningWithoutSources = false; - } catch (Throwable t) { - log.error("Error starting Siddhi App '" + siddhiAppContext.getName() + "', " + - "triggering shutdown process. " + t.getMessage()); - try { - shutdown(); - } catch (Throwable t1) { - log.error("Error shutting down partially started Siddhi App '" + siddhiAppContext.getName() + "', " - + t1.getMessage()); - } - } - } - } - - public synchronized void shutdown() { - SourceHandlerManager sourceHandlerManager = siddhiAppContext.getSiddhiContext().getSourceHandlerManager(); - for (List sources : sourceMap.values()) { - for (Source source : sources) { - try { - if (sourceHandlerManager != null) { - sourceHandlerManager.unregisterSourceHandler(source.getMapper().getHandler(). - getId()); - } - source.shutdown(); - } catch (Throwable t) { - log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext - (t, siddhiAppContext)) + " Error in shutting down source '" + StringUtil. - removeCRLFCharacters(source.getType()) + "' at '" + - StringUtil.removeCRLFCharacters(source.getStreamDefinition().getId()) + - "' on Siddhi App '" + siddhiAppContext.getName() + "'.", t); - } - } - } - - for (Table table : tableMap.values()) { - try { - table.shutdown(); - } catch (Throwable t) { - log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(t, siddhiAppContext)) + - " Error in shutting down table '" + - StringUtil.removeCRLFCharacters(table.getTableDefinition().getId()) + "' on Siddhi App '" + - StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t); - } - } - - SinkHandlerManager sinkHandlerManager = siddhiAppContext.getSiddhiContext().getSinkHandlerManager(); - for (List sinks : sinkMap.values()) { - for (Sink sink : sinks) { - try { - if (sinkHandlerManager != null) { - sinkHandlerManager.unregisterSinkHandler(sink.getHandler().getId()); - } - sink.shutdown(); - } catch (Throwable t) { - log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext - (t, siddhiAppContext)) + " Error in shutting down sink '" + StringUtil. - removeCRLFCharacters(sink.getType()) + "' at '" + StringUtil.removeCRLFCharacters(sink. - getStreamDefinition().getId()) + "' on Siddhi App '" + - StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t); - } - } - } - - for (Table table : tableMap.values()) { - RecordTableHandlerManager recordTableHandlerManager = siddhiAppContext.getSiddhiContext(). - getRecordTableHandlerManager(); - if (recordTableHandlerManager != null) { - String elementId = null; - RecordTableHandler recordTableHandler = table.getHandler(); - if (recordTableHandler != null) { - elementId = recordTableHandler.getId(); - } - if (elementId != null) { - recordTableHandlerManager.unregisterRecordTableHandler(elementId); - } - } - table.shutdown(); - } - - for (ExternalReferencedHolder externalReferencedHolder : siddhiAppContext.getExternalReferencedHolders()) { - try { - externalReferencedHolder.stop(); - } catch (Throwable t) { - log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(t, siddhiAppContext)) + - " Error while stopping ExternalReferencedHolder '" + - StringUtil.removeCRLFCharacters(externalReferencedHolder.toString()) + "' down Siddhi app '" + - StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t); - } - } - inputManager.disconnect(); - - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - - } - for (StreamJunction streamJunction : streamJunctionMap.values()) { - streamJunction.stopProcessing(); - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - - } - siddhiAppContext.getScheduledExecutorService().shutdownNow(); - siddhiAppContext.getExecutorService().shutdownNow(); - - } - }, "Siddhi-SiddhiApp-" + siddhiAppContext.getName() + "-Shutdown-Cleaner"); - thread.start(); - - if (siddhiAppRuntimeMap != null) { - siddhiAppRuntimeMap.remove(siddhiAppContext.getName()); - } - - if (siddhiAppContext.getStatisticsManager() != null) { - if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.OFF) != 0) { - siddhiAppContext.getStatisticsManager().stopReporting(); - } - siddhiAppContext.getStatisticsManager().cleanup(); - } - running = false; - runningWithoutSources = false; - } - - public synchronized SiddhiDebugger debug() { - siddhiDebugger = new SiddhiDebugger(siddhiAppContext); - List streamRuntime = new ArrayList<>(); - List streamCallbacks = new ArrayList<>(); - for (QueryRuntime queryRuntime : queryProcessorMap.values()) { - streamRuntime.add(queryRuntime.getStreamRuntime()); - streamCallbacks.add(queryRuntime.getOutputCallback()); - } - for (StreamRuntime streamRuntime1 : streamRuntime) { - for (SingleStreamRuntime singleStreamRuntime : streamRuntime1.getSingleStreamRuntimes()) { - singleStreamRuntime.getProcessStreamReceiver().setSiddhiDebugger(siddhiDebugger); - } - } - for (OutputCallback callback : streamCallbacks) { - callback.setSiddhiDebugger(siddhiDebugger); - } - start(); - return siddhiDebugger; - } - - public PersistenceReference persist() { - try { - // first, pause all the event sources - sourceMap.values().forEach(list -> list.forEach(Source::pause)); - // take snapshots of execution units - if (siddhiAppContext.getSiddhiContext().getPersistenceStore() != null) { - return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().fullSnapshot(), - siddhiAppContext); - } else { - return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().incrementalSnapshot(), - siddhiAppContext); - } - } finally { - // at the end, resume the event sources - sourceMap.values().forEach(list -> list.forEach(Source::resume)); - } - } - - public byte[] snapshot() { - try { - // first, pause all the event sources - sourceMap.values().forEach(list -> list.forEach(Source::pause)); - // take snapshots of execution units - return siddhiAppContext.getSnapshotService().fullSnapshot(); - } finally { - // at the end, resume the event sources - sourceMap.values().forEach(list -> list.forEach(Source::resume)); - } - } - - public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException { - try { - // first, pause all the event sources - sourceMap.values().forEach(list -> list.forEach(Source::pause)); - // start the restoring process - siddhiAppContext.getSnapshotService().restore(snapshot); - } finally { - // at the end, resume the event sources - sourceMap.values().forEach(list -> list.forEach(Source::resume)); - } - } - - public void restoreRevision(String revision) throws CannotRestoreSiddhiAppStateException { - try { - // first, pause all the event sources - sourceMap.values().forEach(list -> list.forEach(Source::pause)); - // start the restoring process - siddhiAppContext.getSnapshotService().restoreRevision(revision); - } finally { - // at the end, resume the event sources - sourceMap.values().forEach(list -> list.forEach(Source::resume)); - } - } - - public String restoreLastRevision() throws CannotRestoreSiddhiAppStateException { - String revision; - try { - // first, pause all the event sources - sourceMap.values().forEach(list -> list.forEach(Source::pause)); - // start the restoring process - revision = siddhiAppContext.getSnapshotService().restoreLastRevision(); - } finally { - // at the end, resume the event sources - sourceMap.values().forEach(list -> list.forEach(Source::resume)); - } - return revision; - } - - public void clearAllRevisions() throws CannotClearSiddhiAppStateException { - try { - // first, pause all the event sources - sourceMap.values().forEach(list -> list.forEach(Source::pause)); - // start the restoring process - siddhiAppContext.getSnapshotService().clearAllRevisions(); - } finally { - // at the end, resume the event sources - sourceMap.values().forEach(list -> list.forEach(Source::resume)); - } - } - - private void monitorQueryMemoryUsage() { - memoryUsageTracker = siddhiAppContext - .getSiddhiContext() - .getStatisticsConfiguration() - .getFactory() - .createMemoryUsageTracker(siddhiAppContext.getStatisticsManager()); - for (Map.Entry entry : queryProcessorMap.entrySet()) { - QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(), - SiddhiConstants.METRIC_INFIX_QUERIES, siddhiAppContext, memoryUsageTracker); - } - for (PartitionRuntime partitionRuntime : partitionMap.values()) { - partitionRuntime.setMemoryUsageTracker(memoryUsageTracker); - } - for (Map.Entry entry : tableMap.entrySet()) { - QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(), - SiddhiConstants.METRIC_INFIX_TABLES, siddhiAppContext, memoryUsageTracker); - } - for (Map.Entry entry : windowMap.entrySet()) { - QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(), - SiddhiConstants.METRIC_INFIX_WINDOWS, siddhiAppContext, memoryUsageTracker); - } - for (Map.Entry entry : aggregationMap.entrySet()) { - QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(), - SiddhiConstants.METRIC_INFIX_AGGREGATIONS, siddhiAppContext, memoryUsageTracker); - } - } - - private void monitorBufferedEvents() { - bufferedEventsTracker = siddhiAppContext - .getSiddhiContext() - .getStatisticsConfiguration() - .getFactory() - .createBufferSizeTracker(siddhiAppContext.getStatisticsManager()); - for (Map.Entry entry : streamJunctionMap.entrySet()) { - registerForBufferedEvents(entry); - } - for (Map.Entry entry : partitionMap.entrySet()) { - ConcurrentMap streamJunctionMap = ((PartitionRuntime) entry.getValue()) - .getLocalStreamJunctionMap(); - for (Map.Entry streamJunctionEntry : streamJunctionMap.entrySet()) { - registerForBufferedEvents(streamJunctionEntry); - } - } - } - - private void registerForBufferedEvents(Map.Entry entry) { - if (entry.getValue().containsBufferedEvents()) { - String metricName = siddhiAppContext.getSiddhiContext().getStatisticsConfiguration().getMetricPrefix() + - SiddhiConstants.METRIC_DELIMITER + SiddhiConstants.METRIC_INFIX_SIDDHI_APPS + - SiddhiConstants.METRIC_DELIMITER + getName() + SiddhiConstants.METRIC_DELIMITER + - SiddhiConstants.METRIC_INFIX_SIDDHI + SiddhiConstants.METRIC_DELIMITER + - SiddhiConstants.METRIC_INFIX_STREAMS + SiddhiConstants.METRIC_DELIMITER + - entry.getKey() + SiddhiConstants.METRIC_DELIMITER + "size"; - boolean matchExist = false; - for (String regex : siddhiAppContext.getIncludedMetrics()) { - if (metricName.matches(regex)) { - matchExist = true; - break; - } - } - if (matchExist) { - bufferedEventsTracker.registerEventBufferHolder(entry.getValue(), metricName); - } - } - } - - public void handleExceptionWith(ExceptionHandler exceptionHandler) { - siddhiAppContext.setDisruptorExceptionHandler(exceptionHandler); - } - - public void handleRuntimeExceptionWith(ExceptionListener exceptionListener) { - siddhiAppContext.setRuntimeExceptionListener(exceptionListener); - } + Collection> getSources(); + + Collection> getSinks(); + + Collection
getTables(); + + Collection getWindows(); + + Collection getTiggers(); + + Collection getQueries(); + + Collection getPartitions(); + + void addCallback(String streamId, StreamCallback streamCallback); + + void addCallback(String queryName, QueryCallback callback); + + Event[] query(String storeQuery); + + Event[] query(StoreQuery storeQuery); + + Attribute[] getStoreQueryOutputAttributes(String storeQuery); + + Attribute[] getStoreQueryOutputAttributes(StoreQuery storeQuery); + + InputHandler getInputHandler(String streamId); + + void setPurgingEnabled(boolean purgingEnabled); + + void start(); + + void startWithoutSources(); + + void startSources(); + + void shutdown(); + + SiddhiDebugger debug(); + + PersistenceReference persist(); + + byte[] snapshot(); + + void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException; + + void restoreRevision(String revision) throws CannotRestoreSiddhiAppStateException; + + String restoreLastRevision() throws CannotRestoreSiddhiAppStateException; + + void clearAllRevisions() throws CannotClearSiddhiAppStateException; + + void handleExceptionWith(ExceptionHandler exceptionHandler); + + void handleRuntimeExceptionWith(ExceptionListener exceptionListener); /** * Method to check the Siddhi App statistics level enabled. * * @return Level value of Siddhi App statistics state */ - public Level getStatisticsLevel() { - return siddhiAppContext.getRootMetricsLevel(); - } + Level getStatisticsLevel(); /** * To enable, disable and change Siddhi App statistics level on runtime. * * @param level whether statistics is OFF, BASIC or DETAIL */ - public void setStatisticsLevel(Level level) { - if (running && siddhiAppContext.getStatisticsManager() != null) { - - if (siddhiAppContext.getRootMetricsLevel().compareTo(level) == 0) { - if (level == Level.OFF) { - log.info("Siddhi App '" + getName() + "' statistics reporting is already disabled!"); - } else if (level == Level.BASIC || level == Level.DETAIL) { - log.info("Siddhi App '" + getName() + "' statistics reporting is already in " + level + " level!"); - } - } else { - if (level == Level.OFF) { - memoryUsageTracker.disableMemoryUsageMetrics(); - siddhiAppContext.setRootMetricsLevel(Level.OFF); - siddhiAppContext.getStatisticsManager().stopReporting(); - log.info("Siddhi App '" + getName() + "' statistics reporting stopped!"); - } else { - if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.OFF) == 0) { - siddhiAppContext.getStatisticsManager().startReporting(); - log.debug("Siddhi App '" + getName() + "' statistics reporting started!"); - } - if (level == Level.DETAIL) { - memoryUsageTracker.enableMemoryUsageMetrics(); - } - siddhiAppContext.setRootMetricsLevel(level); - log.info("Siddhi App '" + getName() + "' statistics reporting changed to: " + level.toString()); - } - } - } else { - if (running) { - log.debug("Siddhi App '" + getName() + "' statistics reporting not changed, " + - "as app has not started running!"); - } else { - log.debug("Siddhi App '" + getName() + "' statistics reporting not changed, as StatisticsManager" + - " is not defined!"); - } - } - } + void setStatisticsLevel(Level level); /** * To enable and disable Siddhi App playback mode on runtime along with optional parameters. @@ -814,23 +174,6 @@ public void setStatisticsLevel(Level level) { * @param idleTime * @param incrementInMilliseconds */ - public void enablePlayBack(boolean playBackEnabled, Long idleTime, Long incrementInMilliseconds) { - this.siddhiAppContext.setPlayback(playBackEnabled); - if (!playBackEnabled) { - for (Scheduler scheduler : siddhiAppContext.getSchedulerList()) { - scheduler.switchToLiveMode(); - } - } else { - if (idleTime != null && incrementInMilliseconds != null) { - //Only use if both values are present. Else defaults will be used which got assigned when creating - // the siddhi app runtimes if app contained playback information - this.siddhiAppContext.getTimestampGenerator().setIdleTime(idleTime); - this.siddhiAppContext.getTimestampGenerator().setIncrementInMilliseconds(incrementInMilliseconds); - } - - for (Scheduler scheduler : siddhiAppContext.getSchedulerList()) { - scheduler.switchToPlayBackMode(); - } - } - } + void enablePlayBack(boolean playBackEnabled, Long idleTime, Long incrementInMilliseconds); + } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java new file mode 100644 index 0000000000..261e2945b7 --- /dev/null +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java @@ -0,0 +1,867 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.core; + +import com.lmax.disruptor.ExceptionHandler; +import io.siddhi.core.aggregation.AggregationRuntime; +import io.siddhi.core.config.SiddhiAppContext; +import io.siddhi.core.debugger.SiddhiDebugger; +import io.siddhi.core.event.Event; +import io.siddhi.core.exception.CannotClearSiddhiAppStateException; +import io.siddhi.core.exception.CannotRestoreSiddhiAppStateException; +import io.siddhi.core.exception.DefinitionNotExistException; +import io.siddhi.core.exception.QueryNotExistException; +import io.siddhi.core.exception.SiddhiAppRuntimeException; +import io.siddhi.core.exception.StoreQueryCreationException; +import io.siddhi.core.partition.PartitionRuntime; +import io.siddhi.core.partition.PartitionRuntimeImpl; +import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.query.QueryRuntimeImpl; +import io.siddhi.core.query.StoreQueryRuntime; +import io.siddhi.core.query.input.stream.StreamRuntime; +import io.siddhi.core.query.input.stream.single.SingleStreamRuntime; +import io.siddhi.core.query.output.callback.OutputCallback; +import io.siddhi.core.query.output.callback.QueryCallback; +import io.siddhi.core.stream.StreamJunction; +import io.siddhi.core.stream.input.InputHandler; +import io.siddhi.core.stream.input.InputManager; +import io.siddhi.core.stream.input.source.Source; +import io.siddhi.core.stream.input.source.SourceHandlerManager; +import io.siddhi.core.stream.output.StreamCallback; +import io.siddhi.core.stream.output.sink.Sink; +import io.siddhi.core.stream.output.sink.SinkCallback; +import io.siddhi.core.stream.output.sink.SinkHandlerManager; +import io.siddhi.core.table.Table; +import io.siddhi.core.table.record.RecordTableHandler; +import io.siddhi.core.table.record.RecordTableHandlerManager; +import io.siddhi.core.trigger.Trigger; +import io.siddhi.core.util.ExceptionUtil; +import io.siddhi.core.util.Scheduler; +import io.siddhi.core.util.SiddhiConstants; +import io.siddhi.core.util.StringUtil; +import io.siddhi.core.util.extension.holder.ExternalReferencedHolder; +import io.siddhi.core.util.parser.StoreQueryParser; +import io.siddhi.core.util.parser.helper.QueryParserHelper; +import io.siddhi.core.util.persistence.util.PersistenceHelper; +import io.siddhi.core.util.snapshot.PersistenceReference; +import io.siddhi.core.util.statistics.BufferedEventsTracker; +import io.siddhi.core.util.statistics.LatencyTracker; +import io.siddhi.core.util.statistics.MemoryUsageTracker; +import io.siddhi.core.util.statistics.metrics.Level; +import io.siddhi.core.window.Window; +import io.siddhi.query.api.SiddhiApp; +import io.siddhi.query.api.definition.AbstractDefinition; +import io.siddhi.query.api.definition.AggregationDefinition; +import io.siddhi.query.api.definition.Attribute; +import io.siddhi.query.api.definition.StreamDefinition; +import io.siddhi.query.api.definition.TableDefinition; +import io.siddhi.query.api.definition.WindowDefinition; +import io.siddhi.query.api.exception.SiddhiAppContextException; +import io.siddhi.query.api.execution.query.StoreQuery; +import io.siddhi.query.compiler.SiddhiCompiler; +import org.apache.log4j.Logger; + +import java.beans.ExceptionListener; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +/** + * Keep streamDefinitions, partitionRuntimes, queryRuntimes of an SiddhiApp and streamJunctions and inputHandlers used. + */ +public class SiddhiAppRuntimeImpl implements SiddhiAppRuntime { + private static final Logger log = Logger.getLogger(SiddhiAppRuntimeImpl.class); + private final Map windowMap; + private final Map> sourceMap; + private final Map> sinkMap; + private ConcurrentMap aggregationMap; + private Map streamDefinitionMap = + new ConcurrentHashMap(); // Contains stream definition. + private Map tableDefinitionMap = + new ConcurrentHashMap(); // Contains table definition. + private Map windowDefinitionMap = + new ConcurrentHashMap(); // Contains window definition. + private Map aggregationDefinitionMap = + new ConcurrentHashMap(); // Contains aggregation definition. + private InputManager inputManager; + private Map queryProcessorMap = + Collections.synchronizedMap(new LinkedHashMap()); + private Map streamJunctionMap = + new ConcurrentHashMap(); // Contains stream junctions. + private Map tableMap = new ConcurrentHashMap(); // Contains event tables. + private Map partitionMap = + new ConcurrentHashMap(); // Contains partitions. + private LinkedHashMap storeQueryRuntimeMap = + new LinkedHashMap<>(); // Contains partitions. + private ConcurrentMap triggerMap; + private SiddhiAppContext siddhiAppContext; + private Map siddhiAppRuntimeMap; + private MemoryUsageTracker memoryUsageTracker; + private BufferedEventsTracker bufferedEventsTracker; + private LatencyTracker storeQueryLatencyTracker; + private SiddhiDebugger siddhiDebugger; + private boolean running = false; + private boolean runningWithoutSources = false; + private Future futureIncrementalPersistor; + private boolean incrementalDataPurging = true; + + + public SiddhiAppRuntimeImpl(Map streamDefinitionMap, + Map tableDefinitionMap, + Map windowDefinitionMap, + Map aggregationDefinitionMap, + InputManager inputManager, + Map queryProcessorMap, + Map streamJunctionMap, + Map tableMap, + Map windowMap, + ConcurrentMap aggregationMap, + Map> sourceMap, + Map> sinkMap, + Map partitionMap, + ConcurrentMap triggerMap, + SiddhiAppContext siddhiAppContext, + Map siddhiAppRuntimeMap) { + this.streamDefinitionMap = streamDefinitionMap; + this.tableDefinitionMap = tableDefinitionMap; + this.windowDefinitionMap = windowDefinitionMap; + this.aggregationDefinitionMap = aggregationDefinitionMap; + this.inputManager = inputManager; + this.queryProcessorMap = queryProcessorMap; + this.streamJunctionMap = streamJunctionMap; + this.tableMap = tableMap; + this.windowMap = windowMap; + this.aggregationMap = aggregationMap; + this.sourceMap = sourceMap; + this.sinkMap = sinkMap; + this.partitionMap = partitionMap; + this.triggerMap = triggerMap; + this.siddhiAppContext = siddhiAppContext; + this.siddhiAppRuntimeMap = siddhiAppRuntimeMap; + if (siddhiAppContext.getStatisticsManager() != null) { + monitorQueryMemoryUsage(); + monitorBufferedEvents(); + storeQueryLatencyTracker = QueryParserHelper.createLatencyTracker(siddhiAppContext, "query", + SiddhiConstants.METRIC_INFIX_STORE_QUERIES, null); + } + + for (Map.Entry> sinkEntries : sinkMap.entrySet()) { + addCallback(sinkEntries.getKey(), + new SinkCallback(sinkEntries.getValue(), streamDefinitionMap.get(sinkEntries.getKey()))); + } + for (Map.Entry> sourceEntries : sourceMap.entrySet()) { + InputHandler inputHandler = getInputHandler(sourceEntries.getKey()); + for (Source source : sourceEntries.getValue()) { + source.getMapper().setInputHandler(inputHandler); + } + } + } + + public String getName() { + return siddhiAppContext.getName(); + } + + /** + * Get the stream definition map. + * + * @return Map of {@link StreamDefinition}s. + */ + public Map getStreamDefinitionMap() { + return streamDefinitionMap.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> (StreamDefinition) e.getValue())); + } + + /** + * Get the table definition map. + * + * @return Map of {@link TableDefinition}s. + */ + public Map getTableDefinitionMap() { + return tableDefinitionMap.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> (TableDefinition) e.getValue())); + } + + /** + * Get the window definition map. + * + * @return Map of {@link WindowDefinition}s. + */ + public Map getWindowDefinitionMap() { + return windowDefinitionMap.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> (WindowDefinition) e.getValue())); + } + + /** + * Get the aggregation definition map. + * + * @return Map of {@link AggregationDefinition}s. + */ + public Map getAggregationDefinitionMap() { + return aggregationDefinitionMap.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> (AggregationDefinition) e.getValue())); + } + + /** + * Get the names of the available queries. + * + * @return string set of query names. + */ + public Set getQueryNames() { + return queryProcessorMap.keySet(); + } + + public Map> getPartitionedInnerStreamDefinitionMap() { + Map> innerStreams = new HashMap<>(); + for (PartitionRuntime partition : partitionMap.values()) { + innerStreams.put(partition.getPartitionName(), partition.getLocalStreamDefinitionMap()); + } + return innerStreams; + } + + public void addCallback(String streamId, StreamCallback streamCallback) { + streamCallback.setStreamId(streamId); + StreamJunction streamJunction = streamJunctionMap.get(streamId); + if (streamJunction == null) { + throw new DefinitionNotExistException("No stream found with name: " + streamId); + } + streamCallback.setStreamDefinition(streamDefinitionMap.get(streamId)); + streamCallback.setContext(siddhiAppContext); + streamJunction.subscribe(streamCallback); + } + + public void addCallback(String queryName, QueryCallback callback) { + callback.setContext(siddhiAppContext); + QueryRuntime queryRuntime = queryProcessorMap.get(queryName); + if (queryRuntime == null) { + throw new QueryNotExistException("No query found with name: " + queryName); + } + callback.setQuery(queryRuntime.getQuery()); + ((QueryRuntimeImpl) queryRuntime).addCallback(callback); + } + + public Event[] query(String storeQuery) { + return query(SiddhiCompiler.parseStoreQuery(storeQuery), storeQuery); + } + + public Event[] query(StoreQuery storeQuery) { + return query(storeQuery, null); + } + + private Event[] query(StoreQuery storeQuery, String storeQueryString) { + try { + if (Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0 && + storeQueryLatencyTracker != null) { + storeQueryLatencyTracker.markIn(); + } + StoreQueryRuntime storeQueryRuntime; + synchronized (this) { + storeQueryRuntime = storeQueryRuntimeMap.remove(storeQuery); + if (storeQueryRuntime == null) { + storeQueryRuntime = StoreQueryParser.parse(storeQuery, siddhiAppContext, tableMap, windowMap, + aggregationMap); + } else { + storeQueryRuntime.reset(); + } + storeQueryRuntimeMap.put(storeQuery, storeQueryRuntime); + if (storeQueryRuntimeMap.size() > 50) { + Iterator i = storeQueryRuntimeMap.entrySet().iterator(); + if (i.hasNext()) { + i.next(); + i.remove(); + } + } + } + return storeQueryRuntime.execute(); + } catch (RuntimeException e) { + if (e instanceof SiddhiAppContextException) { + throw new StoreQueryCreationException(((SiddhiAppContextException) e).getMessageWithOutContext(), e, + ((SiddhiAppContextException) e).getQueryContextStartIndex(), + ((SiddhiAppContextException) e).getQueryContextEndIndex(), null, storeQueryString); + } + throw new StoreQueryCreationException(e.getMessage(), e); + } finally { + if (Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0 && + storeQueryLatencyTracker != null) { + storeQueryLatencyTracker.markOut(); + } + } + } + + public Attribute[] getStoreQueryOutputAttributes(String storeQuery) { + return getStoreQueryOutputAttributes(SiddhiCompiler.parseStoreQuery(storeQuery), storeQuery); + } + + public Attribute[] getStoreQueryOutputAttributes(StoreQuery storeQuery) { + return getStoreQueryOutputAttributes(storeQuery, null); + } + + + /** + * This method get the storeQuery and return the corresponding output and its types. + * + * @param storeQuery this storeQuery is processed and get the output attributes. + * @param storeQueryString this passed to report errors with context if there are any. + * @return List of output attributes + */ + private Attribute[] getStoreQueryOutputAttributes(StoreQuery storeQuery, String storeQueryString) { + try { + StoreQueryRuntime storeQueryRuntime = storeQueryRuntimeMap.get(storeQuery); + if (storeQueryRuntime == null) { + storeQueryRuntime = StoreQueryParser.parse(storeQuery, siddhiAppContext, tableMap, windowMap, + aggregationMap); + storeQueryRuntimeMap.put(storeQuery, storeQueryRuntime); + } + return storeQueryRuntime.getStoreQueryOutputAttributes(); + } catch (RuntimeException e) { + if (e instanceof SiddhiAppContextException) { + throw new StoreQueryCreationException(((SiddhiAppContextException) e).getMessageWithOutContext(), e, + ((SiddhiAppContextException) e).getQueryContextStartIndex(), + ((SiddhiAppContextException) e).getQueryContextEndIndex(), null, siddhiAppContext + .getSiddhiAppString()); + } + throw new StoreQueryCreationException(e.getMessage(), e); + } + } + + public InputHandler getInputHandler(String streamId) { + return inputManager.getInputHandler(streamId); + } + + public Collection> getSources() { + return sourceMap.values(); + } + + public Collection> getSinks() { + return sinkMap.values(); + } + + public Collection
getTables() { + return tableMap.values(); + } + + @Override + public Collection getWindows() { + return windowMap.values(); + } + + @Override + public Collection getTiggers() { + return triggerMap.values(); + } + + public synchronized void start() { + if (running) { + log.warn("Error calling start() for Siddhi App '" + siddhiAppContext.getName() + "', " + + "SiddhiApp already started."); + return; + } + if (!runningWithoutSources) { + startWithoutSources(); + } + if (runningWithoutSources) { + startSources(); + } + } + + public synchronized void startWithoutSources() { + if (running || runningWithoutSources) { + log.warn("Error calling startWithoutSources() for Siddhi App '" + siddhiAppContext.getName() + "', " + + "SiddhiApp already started."); + } else { + try { + memoryUsageTracker.disableMemoryUsageMetrics(); + if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.OFF) != 0 && + siddhiAppContext.getStatisticsManager() != null) { + if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.DETAIL) == 0) { + memoryUsageTracker.enableMemoryUsageMetrics(); + } + siddhiAppContext.getStatisticsManager().startReporting(); + } + for (ExternalReferencedHolder externalReferencedHolder : + siddhiAppContext.getExternalReferencedHolders()) { + externalReferencedHolder.start(); + } + for (List sinks : sinkMap.values()) { + for (Sink sink : sinks) { + sink.connectWithRetry(); + } + } + for (Table table : tableMap.values()) { + table.connectWithRetry(); + } + for (StreamJunction streamJunction : streamJunctionMap.values()) { + streamJunction.startProcessing(); + } + if (incrementalDataPurging) { + for (AggregationRuntime aggregationRuntime : aggregationMap.values()) { + aggregationRuntime.startPurging(); + } + } + runningWithoutSources = true; + } catch (Throwable t) { + log.error("Error starting Siddhi App '" + siddhiAppContext.getName() + "', " + + "triggering shutdown process. " + t.getMessage()); + try { + shutdown(); + } catch (Throwable t1) { + log.error("Error shutting down partially started Siddhi App '" + siddhiAppContext.getName() + "', " + + t1.getMessage()); + } + } + } + } + + public void setPurgingEnabled(boolean purgingEnabled) { + this.incrementalDataPurging = purgingEnabled; + } + + public void startSources() { + if (running) { + log.warn("Error calling startSources() for Siddhi App '" + siddhiAppContext.getName() + "', " + + "SiddhiApp already started with the sources."); + return; + } + if (!runningWithoutSources) { + throw new SiddhiAppRuntimeException("Cannot call startSources() without calling startWithoutSources() " + + "for Siddhi App '" + siddhiAppContext.getName() + "'"); + } else { + try { + for (List sources : sourceMap.values()) { + for (Source source : sources) { + source.connectWithRetry(); + } + } + running = true; + runningWithoutSources = false; + } catch (Throwable t) { + log.error("Error starting Siddhi App '" + siddhiAppContext.getName() + "', " + + "triggering shutdown process. " + t.getMessage()); + try { + shutdown(); + } catch (Throwable t1) { + log.error("Error shutting down partially started Siddhi App '" + siddhiAppContext.getName() + "', " + + t1.getMessage()); + } + } + } + } + + public synchronized void shutdown() { + SourceHandlerManager sourceHandlerManager = siddhiAppContext.getSiddhiContext().getSourceHandlerManager(); + for (List sources : sourceMap.values()) { + for (Source source : sources) { + try { + if (sourceHandlerManager != null) { + sourceHandlerManager.unregisterSourceHandler(source.getMapper().getHandler(). + getId()); + } + source.shutdown(); + } catch (Throwable t) { + log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext + (t, siddhiAppContext)) + " Error in shutting down source '" + StringUtil. + removeCRLFCharacters(source.getType()) + "' at '" + + StringUtil.removeCRLFCharacters(source.getStreamDefinition().getId()) + + "' on Siddhi App '" + siddhiAppContext.getName() + "'.", t); + } + } + } + + for (Table table : tableMap.values()) { + try { + table.shutdown(); + } catch (Throwable t) { + log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(t, siddhiAppContext)) + + " Error in shutting down table '" + + StringUtil.removeCRLFCharacters(table.getTableDefinition().getId()) + "' on Siddhi App '" + + StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t); + } + } + + SinkHandlerManager sinkHandlerManager = siddhiAppContext.getSiddhiContext().getSinkHandlerManager(); + for (List sinks : sinkMap.values()) { + for (Sink sink : sinks) { + try { + if (sinkHandlerManager != null) { + sinkHandlerManager.unregisterSinkHandler(sink.getHandler().getId()); + } + sink.shutdown(); + } catch (Throwable t) { + log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext + (t, siddhiAppContext)) + " Error in shutting down sink '" + StringUtil. + removeCRLFCharacters(sink.getType()) + "' at '" + StringUtil.removeCRLFCharacters(sink. + getStreamDefinition().getId()) + "' on Siddhi App '" + + StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t); + } + } + } + + for (Table table : tableMap.values()) { + RecordTableHandlerManager recordTableHandlerManager = siddhiAppContext.getSiddhiContext(). + getRecordTableHandlerManager(); + if (recordTableHandlerManager != null) { + String elementId = null; + RecordTableHandler recordTableHandler = table.getHandler(); + if (recordTableHandler != null) { + elementId = recordTableHandler.getId(); + } + if (elementId != null) { + recordTableHandlerManager.unregisterRecordTableHandler(elementId); + } + } + table.shutdown(); + } + + for (ExternalReferencedHolder externalReferencedHolder : siddhiAppContext.getExternalReferencedHolders()) { + try { + externalReferencedHolder.stop(); + } catch (Throwable t) { + log.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(t, siddhiAppContext)) + + " Error while stopping ExternalReferencedHolder '" + + StringUtil.removeCRLFCharacters(externalReferencedHolder.toString()) + "' down Siddhi app '" + + StringUtil.removeCRLFCharacters(siddhiAppContext.getName()) + "'.", t); + } + } + inputManager.disconnect(); + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + + } + for (StreamJunction streamJunction : streamJunctionMap.values()) { + streamJunction.stopProcessing(); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + + } + siddhiAppContext.getScheduledExecutorService().shutdownNow(); + siddhiAppContext.getExecutorService().shutdownNow(); + + } + }, "Siddhi-SiddhiApp-" + siddhiAppContext.getName() + "-Shutdown-Cleaner"); + thread.start(); + + if (siddhiAppRuntimeMap != null) { + siddhiAppRuntimeMap.remove(siddhiAppContext.getName()); + } + + if (siddhiAppContext.getStatisticsManager() != null) { + if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.OFF) != 0) { + siddhiAppContext.getStatisticsManager().stopReporting(); + } + siddhiAppContext.getStatisticsManager().cleanup(); + } + running = false; + runningWithoutSources = false; + } + + public synchronized SiddhiDebugger debug() { + siddhiDebugger = new SiddhiDebugger(siddhiAppContext); + List streamRuntime = new ArrayList<>(); + List streamCallbacks = new ArrayList<>(); + for (QueryRuntime queryRuntime : queryProcessorMap.values()) { + streamRuntime.add(((QueryRuntimeImpl) queryRuntime).getStreamRuntime()); + streamCallbacks.add(((QueryRuntimeImpl) queryRuntime).getOutputCallback()); + } + for (StreamRuntime streamRuntime1 : streamRuntime) { + for (SingleStreamRuntime singleStreamRuntime : streamRuntime1.getSingleStreamRuntimes()) { + singleStreamRuntime.getProcessStreamReceiver().setSiddhiDebugger(siddhiDebugger); + } + } + for (OutputCallback callback : streamCallbacks) { + callback.setSiddhiDebugger(siddhiDebugger); + } + start(); + return siddhiDebugger; + } + + public PersistenceReference persist() { + try { + // first, pause all the event sources + sourceMap.values().forEach(list -> list.forEach(Source::pause)); + // take snapshots of execution units + if (siddhiAppContext.getSiddhiContext().getPersistenceStore() != null) { + return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().fullSnapshot(), + siddhiAppContext); + } else { + return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().incrementalSnapshot(), + siddhiAppContext); + } + } finally { + // at the end, resume the event sources + sourceMap.values().forEach(list -> list.forEach(Source::resume)); + } + } + + public byte[] snapshot() { + try { + // first, pause all the event sources + sourceMap.values().forEach(list -> list.forEach(Source::pause)); + // take snapshots of execution units + return siddhiAppContext.getSnapshotService().fullSnapshot(); + } finally { + // at the end, resume the event sources + sourceMap.values().forEach(list -> list.forEach(Source::resume)); + } + } + + public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException { + try { + // first, pause all the event sources + sourceMap.values().forEach(list -> list.forEach(Source::pause)); + // start the restoring process + siddhiAppContext.getSnapshotService().restore(snapshot); + } finally { + // at the end, resume the event sources + sourceMap.values().forEach(list -> list.forEach(Source::resume)); + } + } + + public void restoreRevision(String revision) throws CannotRestoreSiddhiAppStateException { + try { + // first, pause all the event sources + sourceMap.values().forEach(list -> list.forEach(Source::pause)); + // start the restoring process + siddhiAppContext.getSnapshotService().restoreRevision(revision); + } finally { + // at the end, resume the event sources + sourceMap.values().forEach(list -> list.forEach(Source::resume)); + } + } + + public String restoreLastRevision() throws CannotRestoreSiddhiAppStateException { + String revision; + try { + // first, pause all the event sources + sourceMap.values().forEach(list -> list.forEach(Source::pause)); + // start the restoring process + revision = siddhiAppContext.getSnapshotService().restoreLastRevision(); + } finally { + // at the end, resume the event sources + sourceMap.values().forEach(list -> list.forEach(Source::resume)); + } + return revision; + } + + public void clearAllRevisions() throws CannotClearSiddhiAppStateException { + try { + // first, pause all the event sources + sourceMap.values().forEach(list -> list.forEach(Source::pause)); + // start the restoring process + siddhiAppContext.getSnapshotService().clearAllRevisions(); + } finally { + // at the end, resume the event sources + sourceMap.values().forEach(list -> list.forEach(Source::resume)); + } + } + + private void monitorQueryMemoryUsage() { + memoryUsageTracker = siddhiAppContext + .getSiddhiContext() + .getStatisticsConfiguration() + .getFactory() + .createMemoryUsageTracker(siddhiAppContext.getStatisticsManager()); + for (Map.Entry entry : queryProcessorMap.entrySet()) { + QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(), + SiddhiConstants.METRIC_INFIX_QUERIES, siddhiAppContext, memoryUsageTracker); + } + for (PartitionRuntime partitionRuntime : partitionMap.values()) { + ((PartitionRuntimeImpl) partitionRuntime).setMemoryUsageTracker(memoryUsageTracker); + } + for (Map.Entry entry : tableMap.entrySet()) { + QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(), + SiddhiConstants.METRIC_INFIX_TABLES, siddhiAppContext, memoryUsageTracker); + } + for (Map.Entry entry : windowMap.entrySet()) { + QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(), + SiddhiConstants.METRIC_INFIX_WINDOWS, siddhiAppContext, memoryUsageTracker); + } + for (Map.Entry entry : aggregationMap.entrySet()) { + QueryParserHelper.registerMemoryUsageTracking(entry.getKey(), entry.getValue(), + SiddhiConstants.METRIC_INFIX_AGGREGATIONS, siddhiAppContext, memoryUsageTracker); + } + } + + private void monitorBufferedEvents() { + bufferedEventsTracker = siddhiAppContext + .getSiddhiContext() + .getStatisticsConfiguration() + .getFactory() + .createBufferSizeTracker(siddhiAppContext.getStatisticsManager()); + for (Map.Entry entry : streamJunctionMap.entrySet()) { + registerForBufferedEvents(entry); + } + for (Map.Entry entry : partitionMap.entrySet()) { + ConcurrentMap streamJunctionMap = ((PartitionRuntimeImpl) entry.getValue()) + .getLocalStreamJunctionMap(); + for (Map.Entry streamJunctionEntry : streamJunctionMap.entrySet()) { + registerForBufferedEvents(streamJunctionEntry); + } + } + } + + private void registerForBufferedEvents(Map.Entry entry) { + if (entry.getValue().containsBufferedEvents()) { + String metricName = siddhiAppContext.getSiddhiContext().getStatisticsConfiguration().getMetricPrefix() + + SiddhiConstants.METRIC_DELIMITER + SiddhiConstants.METRIC_INFIX_SIDDHI_APPS + + SiddhiConstants.METRIC_DELIMITER + getName() + SiddhiConstants.METRIC_DELIMITER + + SiddhiConstants.METRIC_INFIX_SIDDHI + SiddhiConstants.METRIC_DELIMITER + + SiddhiConstants.METRIC_INFIX_STREAMS + SiddhiConstants.METRIC_DELIMITER + + entry.getKey() + SiddhiConstants.METRIC_DELIMITER + "size"; + boolean matchExist = false; + for (String regex : siddhiAppContext.getIncludedMetrics()) { + if (metricName.matches(regex)) { + matchExist = true; + break; + } + } + if (matchExist) { + bufferedEventsTracker.registerEventBufferHolder(entry.getValue(), metricName); + } + } + } + + public void handleExceptionWith(ExceptionHandler exceptionHandler) { + siddhiAppContext.setDisruptorExceptionHandler(exceptionHandler); + } + + public void handleRuntimeExceptionWith(ExceptionListener exceptionListener) { + siddhiAppContext.setRuntimeExceptionListener(exceptionListener); + } + + public SiddhiApp getSiddhiApp() { + return siddhiAppContext.getSiddhiApp(); + } + + @Override + public Collection getQueries() { + return queryProcessorMap.values(); + } + + @Override + public Collection getPartitions() { + return partitionMap.values(); + } + + /** + * Method to check the Siddhi App statistics level enabled. + * + * @return Level value of Siddhi App statistics state + */ + public Level getStatisticsLevel() { + return siddhiAppContext.getRootMetricsLevel(); + } + + /** + * To enable, disable and change Siddhi App statistics level on runtime. + * + * @param level whether statistics is OFF, BASIC or DETAIL + */ + public void setStatisticsLevel(Level level) { + if (running && siddhiAppContext.getStatisticsManager() != null) { + + if (siddhiAppContext.getRootMetricsLevel().compareTo(level) == 0) { + if (level == Level.OFF) { + log.info("Siddhi App '" + getName() + "' statistics reporting is already disabled!"); + } else if (level == Level.BASIC || level == Level.DETAIL) { + log.info("Siddhi App '" + getName() + "' statistics reporting is already in " + level + " level!"); + } + } else { + if (level == Level.OFF) { + memoryUsageTracker.disableMemoryUsageMetrics(); + siddhiAppContext.setRootMetricsLevel(Level.OFF); + siddhiAppContext.getStatisticsManager().stopReporting(); + log.info("Siddhi App '" + getName() + "' statistics reporting stopped!"); + } else { + if (siddhiAppContext.getRootMetricsLevel().compareTo(Level.OFF) == 0) { + siddhiAppContext.getStatisticsManager().startReporting(); + log.debug("Siddhi App '" + getName() + "' statistics reporting started!"); + } + if (level == Level.DETAIL) { + memoryUsageTracker.enableMemoryUsageMetrics(); + } + siddhiAppContext.setRootMetricsLevel(level); + log.info("Siddhi App '" + getName() + "' statistics reporting changed to: " + level.toString()); + } + } + } else { + if (running) { + log.debug("Siddhi App '" + getName() + "' statistics reporting not changed, " + + "as app has not started running!"); + } else { + log.debug("Siddhi App '" + getName() + "' statistics reporting not changed, as StatisticsManager" + + " is not defined!"); + } + } + } + + /** + * To enable and disable Siddhi App playback mode on runtime along with optional parameters. + * + * @param playBackEnabled whether playback is enabled or not + * @param idleTime + * @param incrementInMilliseconds + */ + public void enablePlayBack(boolean playBackEnabled, Long idleTime, Long incrementInMilliseconds) { + this.siddhiAppContext.setPlayback(playBackEnabled); + if (!playBackEnabled) { + for (Scheduler scheduler : siddhiAppContext.getSchedulerList()) { + scheduler.switchToLiveMode(); + } + } else { + if (idleTime != null && incrementInMilliseconds != null) { + //Only use if both values are present. Else defaults will be used which got assigned when creating + // the siddhi app runtimes if app contained playback information + this.siddhiAppContext.getTimestampGenerator().setIdleTime(idleTime); + this.siddhiAppContext.getTimestampGenerator().setIncrementInMilliseconds(incrementInMilliseconds); + } + + for (Scheduler scheduler : siddhiAppContext.getSchedulerList()) { + scheduler.switchToPlayBackMode(); + } + } + } +} diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java b/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java index 07ff31db07..8f7b6209b7 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java @@ -76,6 +76,7 @@ public class SiddhiAppContext { private List schedulerList; private static final ThreadLocal GROUP_BY_KEY = new ThreadLocal<>(); private static final ThreadLocal PARTITION_KEY = new ThreadLocal<>(); + private SiddhiApp siddhiApp; public SiddhiAppContext() { this.externalReferencedHolders = Collections.synchronizedList(new LinkedList<>()); @@ -264,6 +265,14 @@ public void setSiddhiAppString(String siddhiAppString) { this.siddhiAppString = siddhiAppString; } + public void setSiddhiApp(SiddhiApp siddhiApp) { + this.siddhiApp = siddhiApp; + } + + public SiddhiApp getSiddhiApp() { + return siddhiApp; + } + public List getIncludedMetrics() { return includedMetrics; } @@ -311,4 +320,5 @@ public StateHolder generateStateHolder(String name, StateFactory stateFactory, b return new EmptyStateHolder(); } } + } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiQueryContext.java b/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiQueryContext.java index 636a278b62..e989a580b4 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiQueryContext.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiQueryContext.java @@ -45,8 +45,8 @@ public class SiddhiQueryContext { private boolean partitioned; private OutputStream.OutputEventType outputEventType; private LatencyTracker latencyTracker; - private Map stateHolderMap; private IdGenerator idGenerator; + private boolean stateful = false; public SiddhiQueryContext(SiddhiAppContext siddhiAppContext, String queryName) { this(siddhiAppContext, queryName, SiddhiConstants.PARTITION_ID_DEFAULT); @@ -130,12 +130,18 @@ public StateHolder generateStateHolder(String name, boolean groupBy, StateFactor if (SnapshotService.getSkipStateStorageThreadLocal().get() == null || !SnapshotService.getSkipStateStorageThreadLocal().get()) { - stateHolderMap = siddhiAppContext.getSnapshotService().getStateHolderMap(partitionId, this.getName()); + Map stateHolderMap = + siddhiAppContext.getSnapshotService().getStateHolderMap(partitionId, this.getName()); stateHolderMap.put(idGenerator.createNewId() + "-" + name, stateHolder); } + stateful = true; return stateHolder; } else { return new EmptyStateHolder(); } } + + public boolean isStateful() { + return stateful; + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntime.java index 46a3d636e6..f4534898e9 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntime.java @@ -17,420 +17,24 @@ */ package io.siddhi.core.partition; -import io.siddhi.core.config.SiddhiAppContext; -import io.siddhi.core.event.state.MetaStateEvent; -import io.siddhi.core.event.stream.MetaStreamEvent; -import io.siddhi.core.exception.SiddhiAppCreationException; -import io.siddhi.core.executor.VariableExpressionExecutor; -import io.siddhi.core.partition.executor.PartitionExecutor; import io.siddhi.core.query.QueryRuntime; -import io.siddhi.core.query.input.stream.join.JoinStreamRuntime; -import io.siddhi.core.query.input.stream.single.SingleStreamRuntime; -import io.siddhi.core.query.input.stream.state.StateStreamRuntime; -import io.siddhi.core.query.output.callback.InsertIntoStreamCallback; -import io.siddhi.core.query.output.callback.InsertIntoWindowCallback; -import io.siddhi.core.stream.StreamJunction; -import io.siddhi.core.util.SiddhiConstants; -import io.siddhi.core.util.parser.helper.DefinitionParserHelper; -import io.siddhi.core.util.parser.helper.QueryParserHelper; -import io.siddhi.core.util.snapshot.state.State; -import io.siddhi.core.util.snapshot.state.StateHolder; -import io.siddhi.core.util.statistics.MemoryUsageTracker; -import io.siddhi.query.api.annotation.Annotation; -import io.siddhi.query.api.annotation.Element; import io.siddhi.query.api.definition.AbstractDefinition; -import io.siddhi.query.api.definition.StreamDefinition; -import io.siddhi.query.api.exception.DuplicateAnnotationException; -import io.siddhi.query.api.execution.partition.Partition; -import io.siddhi.query.api.execution.query.Query; -import io.siddhi.query.api.execution.query.input.state.CountStateElement; -import io.siddhi.query.api.execution.query.input.state.EveryStateElement; -import io.siddhi.query.api.execution.query.input.state.LogicalStateElement; -import io.siddhi.query.api.execution.query.input.state.NextStateElement; -import io.siddhi.query.api.execution.query.input.state.StateElement; -import io.siddhi.query.api.execution.query.input.state.StreamStateElement; -import io.siddhi.query.api.execution.query.input.stream.JoinInputStream; -import io.siddhi.query.api.execution.query.input.stream.SingleInputStream; -import io.siddhi.query.api.execution.query.input.stream.StateInputStream; -import io.siddhi.query.api.execution.query.output.stream.InsertIntoStream; -import io.siddhi.query.api.expression.Expression; -import io.siddhi.query.api.util.AnnotationHelper; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; +import java.util.Collection; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; /** * Runtime class to handle partitioning. It will hold all information regarding current partitions and wil create * partition dynamically during runtime. */ -public class PartitionRuntime { +public interface PartitionRuntime { - private final StateHolder stateHolder; - //default every 5 min - private long purgeExecutionInterval = 300000; - private boolean purgingEnabled = false; - private long purgeIdlePeriod = 0; - private String partitionName; - private Partition partition; - private ConcurrentMap localStreamJunctionMap = new ConcurrentHashMap<>(); - private ConcurrentMap innerPartitionStreamReceiverStreamJunctionMap = - new ConcurrentHashMap<>(); //contains definition - private ConcurrentMap localStreamDefinitionMap = - new ConcurrentHashMap<>(); //contains stream definition - private ConcurrentMap streamDefinitionMap; - private ConcurrentMap windowDefinitionMap; - private ConcurrentMap streamJunctionMap; - private List queryRuntimeList = new ArrayList(); - private ConcurrentMap partitionStreamReceivers = new ConcurrentHashMap<>(); - private SiddhiAppContext siddhiAppContext; + String getPartitionName(); - public PartitionRuntime(ConcurrentMap streamDefinitionMap, - ConcurrentMap windowDefinitionMap, - ConcurrentMap streamJunctionMap, - Partition partition, int partitionIndex, SiddhiAppContext siddhiAppContext) { - this.siddhiAppContext = siddhiAppContext; - if (partition.getPartitionTypeMap().isEmpty()) { - throw new SiddhiAppCreationException("Partition must have at least one executor. But found none."); - } - try { - Element element = AnnotationHelper.getAnnotationElement("info", "name", - partition.getAnnotations()); - if (element != null) { - this.partitionName = element.getValue(); - } - } catch (DuplicateAnnotationException e) { - throw new DuplicateAnnotationException(e.getMessageWithOutContext() + " for the same Query " + - partition.toString(), e, e.getQueryContextStartIndex(), e.getQueryContextEndIndex(), - siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString()); - } - if (partitionName == null) { - this.partitionName = "partition_" + partitionIndex; - } + Map getLocalStreamDefinitionMap(); - Annotation purge = AnnotationHelper.getAnnotation(SiddhiConstants.NAMESPACE_PURGE, partition.getAnnotations()); - if (purge != null) { - if (purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_ENABLE) != null) { - String purgeEnable = purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_ENABLE); - if (!("true".equalsIgnoreCase(purgeEnable) || "false".equalsIgnoreCase(purgeEnable))) { - throw new SiddhiAppCreationException("Invalid value for enable: " + purgeEnable + "." + - " Please use 'true' or 'false'"); - } else { - purgingEnabled = Boolean.parseBoolean(purgeEnable); - } - } else { - throw new SiddhiAppCreationException("Annotation @" + SiddhiConstants.NAMESPACE_PURGE + - " is missing element '" + SiddhiConstants.ANNOTATION_ELEMENT_ENABLE + "'"); - } - if (purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_IDLE_PERIOD) != null) { - String purgeIdle = purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_IDLE_PERIOD); - purgeIdlePeriod = Expression.Time.timeToLong(purgeIdle); + Collection getQueries(); - } else { - throw new SiddhiAppCreationException("Annotation @" + SiddhiConstants.NAMESPACE_PURGE + - " is missing element '" + SiddhiConstants.ANNOTATION_ELEMENT_IDLE_PERIOD + "'"); - } - - if (purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_INTERVAL) != null) { - String interval = purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_INTERVAL); - purgeExecutionInterval = Expression.Time.timeToLong(interval); - } - } - this.partition = partition; - this.streamDefinitionMap = streamDefinitionMap; - this.windowDefinitionMap = windowDefinitionMap; - this.streamJunctionMap = streamJunctionMap; - - this.stateHolder = siddhiAppContext.generateStateHolder(partitionName, () -> new PartitionState()); - } - - public void addQuery(QueryRuntime metaQueryRuntime) { - Query query = metaQueryRuntime.getQuery(); - - if (query.getOutputStream() instanceof InsertIntoStream && - metaQueryRuntime.getOutputCallback() instanceof InsertIntoStreamCallback) { - InsertIntoStreamCallback insertIntoStreamCallback = (InsertIntoStreamCallback) metaQueryRuntime - .getOutputCallback(); - StreamDefinition streamDefinition = insertIntoStreamCallback.getOutputStreamDefinition(); - String id = streamDefinition.getId(); - - if (((InsertIntoStream) query.getOutputStream()).isInnerStream()) { - metaQueryRuntime.setToLocalStream(true); - localStreamDefinitionMap.putIfAbsent(id, streamDefinition); - DefinitionParserHelper.validateOutputStream(streamDefinition, localStreamDefinitionMap.get(id)); - - StreamJunction outputStreamJunction = localStreamJunctionMap.get(id); - - if (outputStreamJunction == null) { - outputStreamJunction = new StreamJunction(streamDefinition, - siddhiAppContext.getExecutorService(), - siddhiAppContext.getBufferSize(), - null, siddhiAppContext); - localStreamJunctionMap.putIfAbsent(id, outputStreamJunction); - } - insertIntoStreamCallback.init(localStreamJunctionMap.get(id)); - } else { - streamDefinitionMap.putIfAbsent(id, streamDefinition); - DefinitionParserHelper.validateOutputStream(streamDefinition, streamDefinitionMap.get(id)); - StreamJunction outputStreamJunction = streamJunctionMap.get(id); - - if (outputStreamJunction == null) { - outputStreamJunction = new StreamJunction(streamDefinition, - siddhiAppContext.getExecutorService(), - siddhiAppContext.getBufferSize(), - null, siddhiAppContext); - streamJunctionMap.putIfAbsent(id, outputStreamJunction); - } - insertIntoStreamCallback.init(streamJunctionMap.get(id)); - } - } else if (query.getOutputStream() instanceof InsertIntoStream && - metaQueryRuntime.getOutputCallback() instanceof InsertIntoWindowCallback) { - InsertIntoWindowCallback insertIntoWindowCallback = (InsertIntoWindowCallback) - metaQueryRuntime.getOutputCallback(); - StreamDefinition streamDefinition = insertIntoWindowCallback.getOutputStreamDefinition(); - String id = streamDefinition.getId(); - DefinitionParserHelper.validateOutputStream(streamDefinition, windowDefinitionMap.get(id)); - StreamJunction outputStreamJunction = streamJunctionMap.get(id); - - if (outputStreamJunction == null) { - outputStreamJunction = new StreamJunction(streamDefinition, - siddhiAppContext.getExecutorService(), - siddhiAppContext.getBufferSize(), - null, siddhiAppContext); - streamJunctionMap.putIfAbsent(id, outputStreamJunction); - } - insertIntoWindowCallback.getWindow().setPublisher(streamJunctionMap.get(insertIntoWindowCallback - .getOutputStreamDefinition().getId()).constructPublisher()); - } - - if (metaQueryRuntime.isFromLocalStream()) { - for (int i = 0; i < metaQueryRuntime.getStreamRuntime().getSingleStreamRuntimes().size(); i++) { - String streamId = metaQueryRuntime.getStreamRuntime().getSingleStreamRuntimes().get(i) - .getProcessStreamReceiver().getStreamId(); - if (streamId.startsWith("#")) { - StreamDefinition streamDefinition = (StreamDefinition) localStreamDefinitionMap.get(streamId); - StreamJunction streamJunction = localStreamJunctionMap.get(streamId); - if (streamJunction == null) { - streamJunction = new StreamJunction(streamDefinition, siddhiAppContext - .getExecutorService(), - siddhiAppContext.getBufferSize(), - null, siddhiAppContext); - localStreamJunctionMap.put(streamId, streamJunction); - } - streamJunction.subscribe(metaQueryRuntime.getStreamRuntime().getSingleStreamRuntimes().get - (i).getProcessStreamReceiver()); - } - } - } - - queryRuntimeList.add(metaQueryRuntime); - } - - public void addPartitionReceiver(QueryRuntime queryRuntime, List executors, - MetaStateEvent metaEvent) { - Query query = queryRuntime.getQuery(); - List> partitionExecutors = new StreamPartitioner(query.getInputStream(), - partition, metaEvent, executors, queryRuntime.getSiddhiQueryContext()).getPartitionExecutorLists(); - if (queryRuntime.getStreamRuntime() instanceof SingleStreamRuntime) { - SingleInputStream singleInputStream = (SingleInputStream) query.getInputStream(); - addPartitionReceiver(singleInputStream.getStreamId(), singleInputStream.isInnerStream(), metaEvent - .getMetaStreamEvent(0), partitionExecutors.get(0)); - } else if (queryRuntime.getStreamRuntime() instanceof JoinStreamRuntime) { - SingleInputStream leftSingleInputStream = (SingleInputStream) ((JoinInputStream) query.getInputStream()) - .getLeftInputStream(); - addPartitionReceiver(leftSingleInputStream.getStreamId(), leftSingleInputStream.isInnerStream(), - metaEvent.getMetaStreamEvent(0), partitionExecutors.get(0)); - SingleInputStream rightSingleInputStream = (SingleInputStream) ((JoinInputStream) query.getInputStream()) - .getRightInputStream(); - addPartitionReceiver(rightSingleInputStream.getStreamId(), rightSingleInputStream.isInnerStream(), - metaEvent.getMetaStreamEvent(1), partitionExecutors.get(1)); - } else if (queryRuntime.getStreamRuntime() instanceof StateStreamRuntime) { - StateElement stateElement = ((StateInputStream) query.getInputStream()).getStateElement(); - addPartitionReceiverForStateElement(stateElement, metaEvent, partitionExecutors, 0); - } - } - - private int addPartitionReceiverForStateElement(StateElement stateElement, MetaStateEvent metaEvent, - List> partitionExecutors, - int executorIndex) { - if (stateElement instanceof EveryStateElement) { - return addPartitionReceiverForStateElement(((EveryStateElement) stateElement).getStateElement(), - metaEvent, partitionExecutors, executorIndex); - } else if (stateElement instanceof NextStateElement) { - executorIndex = addPartitionReceiverForStateElement(((NextStateElement) stateElement).getStateElement(), - metaEvent, partitionExecutors, executorIndex); - return addPartitionReceiverForStateElement(((NextStateElement) stateElement).getNextStateElement(), - metaEvent, partitionExecutors, executorIndex); - } else if (stateElement instanceof CountStateElement) { - return addPartitionReceiverForStateElement(((CountStateElement) stateElement).getStreamStateElement(), - metaEvent, partitionExecutors, executorIndex); - } else if (stateElement instanceof LogicalStateElement) { - executorIndex = addPartitionReceiverForStateElement(((LogicalStateElement) stateElement) - .getStreamStateElement1(), metaEvent, - partitionExecutors, executorIndex); - return addPartitionReceiverForStateElement(((LogicalStateElement) stateElement).getStreamStateElement2(), - metaEvent, partitionExecutors, executorIndex); - } else { //if stateElement is an instanceof StreamStateElement - SingleInputStream singleInputStream = ((StreamStateElement) stateElement).getBasicSingleInputStream(); - addPartitionReceiver(singleInputStream.getStreamId(), singleInputStream.isInnerStream(), metaEvent - .getMetaStreamEvent(executorIndex), partitionExecutors.get(executorIndex)); - return ++executorIndex; - } - } - - private void addPartitionReceiver(String streamId, boolean isInnerStream, MetaStreamEvent metaStreamEvent, - List partitionExecutors) { - if (!partitionStreamReceivers.containsKey(streamId) && !isInnerStream && - metaStreamEvent.getEventType() == MetaStreamEvent.EventType.DEFAULT) { - StreamDefinition streamDefinition = (StreamDefinition) streamDefinitionMap.get(streamId); - if (streamDefinition == null) { - streamDefinition = (StreamDefinition) windowDefinitionMap.get(streamId); - } - PartitionStreamReceiver partitionStreamReceiver = new PartitionStreamReceiver( - siddhiAppContext, metaStreamEvent, streamDefinition, partitionExecutors, this); - partitionStreamReceivers.put(partitionStreamReceiver.getStreamId(), partitionStreamReceiver); - streamJunctionMap.get(partitionStreamReceiver.getStreamId()).subscribe(partitionStreamReceiver); - } - - } - - private StreamJunction createStreamJunction(StreamDefinition streamDefinition) { - return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorService(), - siddhiAppContext.getBufferSize(), null, siddhiAppContext); - } - - public void addInnerpartitionStreamReceiverStreamJunction(String key, StreamJunction streamJunction) { - innerPartitionStreamReceiverStreamJunctionMap.put(key, streamJunction); - } - - public ConcurrentMap getInnerPartitionStreamReceiverStreamJunctionMap() { - return innerPartitionStreamReceiverStreamJunctionMap; - } - - public void init() { - for (PartitionStreamReceiver partitionStreamReceiver : partitionStreamReceivers.values()) { - partitionStreamReceiver.addStreamJunction(queryRuntimeList); - partitionStreamReceiver.init(); - } - } - - public String getPartitionName() { - return partitionName; - } - - public ConcurrentMap getLocalStreamDefinitionMap() { - return localStreamDefinitionMap; - } - - public ConcurrentMap getLocalStreamJunctionMap() { - return localStreamJunctionMap; - } - - - public void setMemoryUsageTracker(MemoryUsageTracker memoryUsageTracker) { - for (QueryRuntime queryRuntime : queryRuntimeList) { - QueryParserHelper.registerMemoryUsageTracking(queryRuntime.getQueryId(), queryRuntime, - SiddhiConstants.METRIC_INFIX_QUERIES, siddhiAppContext, memoryUsageTracker); - } - } - - public void initPartition() { - PartitionState state = stateHolder.getState(); - try { - Long time = state.partitionKeys.get(SiddhiAppContext.getPartitionFlowId()); - if (time == null) { - synchronized (state) { - time = state.partitionKeys.get(SiddhiAppContext.getPartitionFlowId()); - if (time == null) { - for (QueryRuntime queryRuntime : queryRuntimeList) { - queryRuntime.initPartition(); - } - } - state.partitionKeys.put(SiddhiAppContext.getPartitionFlowId(), - siddhiAppContext.getTimestampGenerator().currentTime()); - } - } else { - state.partitionKeys.put(SiddhiAppContext.getPartitionFlowId(), - siddhiAppContext.getTimestampGenerator().currentTime()); - } - } finally { - stateHolder.returnState(state); - } - if (purgingEnabled) { - siddhiAppContext.getScheduledExecutorService().scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - long currentTime = siddhiAppContext.getTimestampGenerator().currentTime(); - PartitionState state = stateHolder.getState(); - try { - synchronized (state) { - HashMap partitions = new HashMap<>(state.partitionKeys); - for (Map.Entry partition : partitions.entrySet()) { - if (partition.getValue() + purgeIdlePeriod < currentTime) { - state.partitionKeys.remove(partition.getKey()); - SiddhiAppContext.startPartitionFlow(partition.getKey()); - try { - for (QueryRuntime queryRuntime : queryRuntimeList) { - Map elementHolderMap = - siddhiAppContext.getSnapshotService().getStateHolderMap( - partitionName, queryRuntime.getQueryId()); - for (StateHolder stateHolder : elementHolderMap.values()) { - stateHolder.cleanGroupByStates(); - } - } - } finally { - SiddhiAppContext.stopPartitionFlow(); - } - } - } - } - } finally { - stateHolder.returnState(state); - } - } - }, purgeExecutionInterval, purgeExecutionInterval, TimeUnit.MILLISECONDS); - } - } - - public Set getPartitionKeys() { - PartitionState state = stateHolder.getState(); - try { - return new HashSet<>(state.partitionKeys.keySet()); - } finally { - stateHolder.returnState(state); - } - } - - /** - * State of partition - */ - public class PartitionState extends State { - - private Map partitionKeys = new ConcurrentHashMap<>(); - - @Override - public boolean canDestroy() { - return partitionKeys.isEmpty(); - } - - @Override - public Map snapshot() { - Map state = new HashMap<>(); - state.put("PartitionKeys", partitionKeys); - return state; - } - - @Override - public void restore(Map state) { - partitionKeys = (Map) state.get("PartitionKeys"); - } - } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntimeImpl.java b/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntimeImpl.java new file mode 100644 index 0000000000..8333264b8a --- /dev/null +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntimeImpl.java @@ -0,0 +1,442 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.core.partition; + +import io.siddhi.core.config.SiddhiAppContext; +import io.siddhi.core.event.state.MetaStateEvent; +import io.siddhi.core.event.stream.MetaStreamEvent; +import io.siddhi.core.exception.SiddhiAppCreationException; +import io.siddhi.core.executor.VariableExpressionExecutor; +import io.siddhi.core.partition.executor.PartitionExecutor; +import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.query.QueryRuntimeImpl; +import io.siddhi.core.query.input.stream.join.JoinStreamRuntime; +import io.siddhi.core.query.input.stream.single.SingleStreamRuntime; +import io.siddhi.core.query.input.stream.state.StateStreamRuntime; +import io.siddhi.core.query.output.callback.InsertIntoStreamCallback; +import io.siddhi.core.query.output.callback.InsertIntoWindowCallback; +import io.siddhi.core.stream.StreamJunction; +import io.siddhi.core.util.SiddhiConstants; +import io.siddhi.core.util.parser.helper.DefinitionParserHelper; +import io.siddhi.core.util.parser.helper.QueryParserHelper; +import io.siddhi.core.util.snapshot.state.State; +import io.siddhi.core.util.snapshot.state.StateHolder; +import io.siddhi.core.util.statistics.MemoryUsageTracker; +import io.siddhi.query.api.annotation.Annotation; +import io.siddhi.query.api.annotation.Element; +import io.siddhi.query.api.definition.AbstractDefinition; +import io.siddhi.query.api.definition.StreamDefinition; +import io.siddhi.query.api.exception.DuplicateAnnotationException; +import io.siddhi.query.api.execution.partition.Partition; +import io.siddhi.query.api.execution.query.Query; +import io.siddhi.query.api.execution.query.input.state.CountStateElement; +import io.siddhi.query.api.execution.query.input.state.EveryStateElement; +import io.siddhi.query.api.execution.query.input.state.LogicalStateElement; +import io.siddhi.query.api.execution.query.input.state.NextStateElement; +import io.siddhi.query.api.execution.query.input.state.StateElement; +import io.siddhi.query.api.execution.query.input.state.StreamStateElement; +import io.siddhi.query.api.execution.query.input.stream.JoinInputStream; +import io.siddhi.query.api.execution.query.input.stream.SingleInputStream; +import io.siddhi.query.api.execution.query.input.stream.StateInputStream; +import io.siddhi.query.api.execution.query.output.stream.InsertIntoStream; +import io.siddhi.query.api.expression.Expression; +import io.siddhi.query.api.util.AnnotationHelper; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +/** + * Runtime class to handle partitioning. It will hold all information regarding current partitions and wil create + * partition dynamically during runtime. + */ +public class PartitionRuntimeImpl implements PartitionRuntime { + + private final StateHolder stateHolder; + //default every 5 min + private long purgeExecutionInterval = 300000; + private boolean purgingEnabled = false; + private long purgeIdlePeriod = 0; + private String partitionName; + private Partition partition; + private ConcurrentMap localStreamJunctionMap = new ConcurrentHashMap<>(); + private ConcurrentMap innerPartitionStreamReceiverStreamJunctionMap = + new ConcurrentHashMap<>(); //contains definition + private ConcurrentMap localStreamDefinitionMap = + new ConcurrentHashMap<>(); //contains stream definition + private ConcurrentMap streamDefinitionMap; + private ConcurrentMap windowDefinitionMap; + private ConcurrentMap streamJunctionMap; + private List queryRuntimeList = new ArrayList(); + private ConcurrentMap partitionStreamReceivers = new ConcurrentHashMap<>(); + private SiddhiAppContext siddhiAppContext; + + public PartitionRuntimeImpl(ConcurrentMap streamDefinitionMap, + ConcurrentMap windowDefinitionMap, + ConcurrentMap streamJunctionMap, + Partition partition, int partitionIndex, SiddhiAppContext siddhiAppContext) { + this.siddhiAppContext = siddhiAppContext; + if (partition.getPartitionTypeMap().isEmpty()) { + throw new SiddhiAppCreationException("Partition must have at least one executor. But found none."); + } + try { + Element element = AnnotationHelper.getAnnotationElement("info", "name", + partition.getAnnotations()); + if (element != null) { + this.partitionName = element.getValue(); + } + } catch (DuplicateAnnotationException e) { + throw new DuplicateAnnotationException(e.getMessageWithOutContext() + " for the same Query " + + partition.toString(), e, e.getQueryContextStartIndex(), e.getQueryContextEndIndex(), + siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString()); + } + if (partitionName == null) { + this.partitionName = "partition_" + partitionIndex; + } + + Annotation purge = AnnotationHelper.getAnnotation(SiddhiConstants.NAMESPACE_PURGE, partition.getAnnotations()); + if (purge != null) { + if (purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_ENABLE) != null) { + String purgeEnable = purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_ENABLE); + if (!("true".equalsIgnoreCase(purgeEnable) || "false".equalsIgnoreCase(purgeEnable))) { + throw new SiddhiAppCreationException("Invalid value for enable: " + purgeEnable + "." + + " Please use 'true' or 'false'"); + } else { + purgingEnabled = Boolean.parseBoolean(purgeEnable); + } + } else { + throw new SiddhiAppCreationException("Annotation @" + SiddhiConstants.NAMESPACE_PURGE + + " is missing element '" + SiddhiConstants.ANNOTATION_ELEMENT_ENABLE + "'"); + } + if (purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_IDLE_PERIOD) != null) { + String purgeIdle = purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_IDLE_PERIOD); + purgeIdlePeriod = Expression.Time.timeToLong(purgeIdle); + + } else { + throw new SiddhiAppCreationException("Annotation @" + SiddhiConstants.NAMESPACE_PURGE + + " is missing element '" + SiddhiConstants.ANNOTATION_ELEMENT_IDLE_PERIOD + "'"); + } + + if (purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_INTERVAL) != null) { + String interval = purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_INTERVAL); + purgeExecutionInterval = Expression.Time.timeToLong(interval); + } + } + this.partition = partition; + this.streamDefinitionMap = streamDefinitionMap; + this.windowDefinitionMap = windowDefinitionMap; + this.streamJunctionMap = streamJunctionMap; + + this.stateHolder = siddhiAppContext.generateStateHolder(partitionName, () -> new PartitionState()); + } + + public void addQuery(QueryRuntimeImpl metaQueryRuntime) { + Query query = metaQueryRuntime.getQuery(); + + if (query.getOutputStream() instanceof InsertIntoStream && + metaQueryRuntime.getOutputCallback() instanceof InsertIntoStreamCallback) { + InsertIntoStreamCallback insertIntoStreamCallback = (InsertIntoStreamCallback) metaQueryRuntime + .getOutputCallback(); + StreamDefinition streamDefinition = insertIntoStreamCallback.getOutputStreamDefinition(); + String id = streamDefinition.getId(); + + if (((InsertIntoStream) query.getOutputStream()).isInnerStream()) { + metaQueryRuntime.setToLocalStream(true); + localStreamDefinitionMap.putIfAbsent(id, streamDefinition); + DefinitionParserHelper.validateOutputStream(streamDefinition, localStreamDefinitionMap.get(id)); + + StreamJunction outputStreamJunction = localStreamJunctionMap.get(id); + + if (outputStreamJunction == null) { + outputStreamJunction = new StreamJunction(streamDefinition, + siddhiAppContext.getExecutorService(), + siddhiAppContext.getBufferSize(), + null, siddhiAppContext); + localStreamJunctionMap.putIfAbsent(id, outputStreamJunction); + } + insertIntoStreamCallback.init(localStreamJunctionMap.get(id)); + } else { + streamDefinitionMap.putIfAbsent(id, streamDefinition); + DefinitionParserHelper.validateOutputStream(streamDefinition, streamDefinitionMap.get(id)); + StreamJunction outputStreamJunction = streamJunctionMap.get(id); + + if (outputStreamJunction == null) { + outputStreamJunction = new StreamJunction(streamDefinition, + siddhiAppContext.getExecutorService(), + siddhiAppContext.getBufferSize(), + null, siddhiAppContext); + streamJunctionMap.putIfAbsent(id, outputStreamJunction); + } + insertIntoStreamCallback.init(streamJunctionMap.get(id)); + } + } else if (query.getOutputStream() instanceof InsertIntoStream && + metaQueryRuntime.getOutputCallback() instanceof InsertIntoWindowCallback) { + InsertIntoWindowCallback insertIntoWindowCallback = (InsertIntoWindowCallback) + metaQueryRuntime.getOutputCallback(); + StreamDefinition streamDefinition = insertIntoWindowCallback.getOutputStreamDefinition(); + String id = streamDefinition.getId(); + DefinitionParserHelper.validateOutputStream(streamDefinition, windowDefinitionMap.get(id)); + StreamJunction outputStreamJunction = streamJunctionMap.get(id); + + if (outputStreamJunction == null) { + outputStreamJunction = new StreamJunction(streamDefinition, + siddhiAppContext.getExecutorService(), + siddhiAppContext.getBufferSize(), + null, siddhiAppContext); + streamJunctionMap.putIfAbsent(id, outputStreamJunction); + } + insertIntoWindowCallback.getWindow().setPublisher(streamJunctionMap.get(insertIntoWindowCallback + .getOutputStreamDefinition().getId()).constructPublisher()); + } + + if (metaQueryRuntime.isFromLocalStream()) { + for (int i = 0; i < metaQueryRuntime.getStreamRuntime().getSingleStreamRuntimes().size(); i++) { + String streamId = metaQueryRuntime.getStreamRuntime().getSingleStreamRuntimes().get(i) + .getProcessStreamReceiver().getStreamId(); + if (streamId.startsWith("#")) { + StreamDefinition streamDefinition = (StreamDefinition) localStreamDefinitionMap.get(streamId); + StreamJunction streamJunction = localStreamJunctionMap.get(streamId); + if (streamJunction == null) { + streamJunction = new StreamJunction(streamDefinition, siddhiAppContext + .getExecutorService(), + siddhiAppContext.getBufferSize(), + null, siddhiAppContext); + localStreamJunctionMap.put(streamId, streamJunction); + } + streamJunction.subscribe(metaQueryRuntime.getStreamRuntime().getSingleStreamRuntimes().get + (i).getProcessStreamReceiver()); + } + } + } + + queryRuntimeList.add(metaQueryRuntime); + } + + public void addPartitionReceiver(QueryRuntimeImpl queryRuntime, List executors, + MetaStateEvent metaEvent) { + Query query = queryRuntime.getQuery(); + List> partitionExecutors = new StreamPartitioner(query.getInputStream(), + partition, metaEvent, executors, queryRuntime.getSiddhiQueryContext()).getPartitionExecutorLists(); + if (queryRuntime.getStreamRuntime() instanceof SingleStreamRuntime) { + SingleInputStream singleInputStream = (SingleInputStream) query.getInputStream(); + addPartitionReceiver(singleInputStream.getStreamId(), singleInputStream.isInnerStream(), metaEvent + .getMetaStreamEvent(0), partitionExecutors.get(0)); + } else if (queryRuntime.getStreamRuntime() instanceof JoinStreamRuntime) { + SingleInputStream leftSingleInputStream = (SingleInputStream) ((JoinInputStream) query.getInputStream()) + .getLeftInputStream(); + addPartitionReceiver(leftSingleInputStream.getStreamId(), leftSingleInputStream.isInnerStream(), + metaEvent.getMetaStreamEvent(0), partitionExecutors.get(0)); + SingleInputStream rightSingleInputStream = (SingleInputStream) ((JoinInputStream) query.getInputStream()) + .getRightInputStream(); + addPartitionReceiver(rightSingleInputStream.getStreamId(), rightSingleInputStream.isInnerStream(), + metaEvent.getMetaStreamEvent(1), partitionExecutors.get(1)); + } else if (queryRuntime.getStreamRuntime() instanceof StateStreamRuntime) { + StateElement stateElement = ((StateInputStream) query.getInputStream()).getStateElement(); + addPartitionReceiverForStateElement(stateElement, metaEvent, partitionExecutors, 0); + } + } + + private int addPartitionReceiverForStateElement(StateElement stateElement, MetaStateEvent metaEvent, + List> partitionExecutors, + int executorIndex) { + if (stateElement instanceof EveryStateElement) { + return addPartitionReceiverForStateElement(((EveryStateElement) stateElement).getStateElement(), + metaEvent, partitionExecutors, executorIndex); + } else if (stateElement instanceof NextStateElement) { + executorIndex = addPartitionReceiverForStateElement(((NextStateElement) stateElement).getStateElement(), + metaEvent, partitionExecutors, executorIndex); + return addPartitionReceiverForStateElement(((NextStateElement) stateElement).getNextStateElement(), + metaEvent, partitionExecutors, executorIndex); + } else if (stateElement instanceof CountStateElement) { + return addPartitionReceiverForStateElement(((CountStateElement) stateElement).getStreamStateElement(), + metaEvent, partitionExecutors, executorIndex); + } else if (stateElement instanceof LogicalStateElement) { + executorIndex = addPartitionReceiverForStateElement(((LogicalStateElement) stateElement) + .getStreamStateElement1(), metaEvent, + partitionExecutors, executorIndex); + return addPartitionReceiverForStateElement(((LogicalStateElement) stateElement).getStreamStateElement2(), + metaEvent, partitionExecutors, executorIndex); + } else { //if stateElement is an instanceof StreamStateElement + SingleInputStream singleInputStream = ((StreamStateElement) stateElement).getBasicSingleInputStream(); + addPartitionReceiver(singleInputStream.getStreamId(), singleInputStream.isInnerStream(), metaEvent + .getMetaStreamEvent(executorIndex), partitionExecutors.get(executorIndex)); + return ++executorIndex; + } + } + + private void addPartitionReceiver(String streamId, boolean isInnerStream, MetaStreamEvent metaStreamEvent, + List partitionExecutors) { + if (!partitionStreamReceivers.containsKey(streamId) && !isInnerStream && + metaStreamEvent.getEventType() == MetaStreamEvent.EventType.DEFAULT) { + StreamDefinition streamDefinition = (StreamDefinition) streamDefinitionMap.get(streamId); + if (streamDefinition == null) { + streamDefinition = (StreamDefinition) windowDefinitionMap.get(streamId); + } + PartitionStreamReceiver partitionStreamReceiver = new PartitionStreamReceiver( + siddhiAppContext, metaStreamEvent, streamDefinition, partitionExecutors, this); + partitionStreamReceivers.put(partitionStreamReceiver.getStreamId(), partitionStreamReceiver); + streamJunctionMap.get(partitionStreamReceiver.getStreamId()).subscribe(partitionStreamReceiver); + } + + } + + private StreamJunction createStreamJunction(StreamDefinition streamDefinition) { + return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorService(), + siddhiAppContext.getBufferSize(), null, siddhiAppContext); + } + + public void addInnerpartitionStreamReceiverStreamJunction(String key, StreamJunction streamJunction) { + innerPartitionStreamReceiverStreamJunctionMap.put(key, streamJunction); + } + + public ConcurrentMap getInnerPartitionStreamReceiverStreamJunctionMap() { + return innerPartitionStreamReceiverStreamJunctionMap; + } + + public void init() { + for (PartitionStreamReceiver partitionStreamReceiver : partitionStreamReceivers.values()) { + partitionStreamReceiver.addStreamJunction(queryRuntimeList); + partitionStreamReceiver.init(); + } + } + + public String getPartitionName() { + return partitionName; + } + + public ConcurrentMap getLocalStreamDefinitionMap() { + return localStreamDefinitionMap; + } + + public ConcurrentMap getLocalStreamJunctionMap() { + return localStreamJunctionMap; + } + + + public void setMemoryUsageTracker(MemoryUsageTracker memoryUsageTracker) { + for (QueryRuntime queryRuntime : queryRuntimeList) { + QueryParserHelper.registerMemoryUsageTracking(queryRuntime.getQueryId(), queryRuntime, + SiddhiConstants.METRIC_INFIX_QUERIES, siddhiAppContext, memoryUsageTracker); + } + } + + public void initPartition() { + PartitionState state = stateHolder.getState(); + try { + Long time = state.partitionKeys.get(SiddhiAppContext.getPartitionFlowId()); + if (time == null) { + synchronized (state) { + time = state.partitionKeys.get(SiddhiAppContext.getPartitionFlowId()); + if (time == null) { + for (QueryRuntime queryRuntime : queryRuntimeList) { + ((QueryRuntimeImpl) queryRuntime).initPartition(); + } + } + state.partitionKeys.put(SiddhiAppContext.getPartitionFlowId(), + siddhiAppContext.getTimestampGenerator().currentTime()); + } + } else { + state.partitionKeys.put(SiddhiAppContext.getPartitionFlowId(), + siddhiAppContext.getTimestampGenerator().currentTime()); + } + } finally { + stateHolder.returnState(state); + } + if (purgingEnabled) { + siddhiAppContext.getScheduledExecutorService().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + long currentTime = siddhiAppContext.getTimestampGenerator().currentTime(); + PartitionState state = stateHolder.getState(); + try { + synchronized (state) { + HashMap partitions = new HashMap<>(state.partitionKeys); + for (Map.Entry partition : partitions.entrySet()) { + if (partition.getValue() + purgeIdlePeriod < currentTime) { + state.partitionKeys.remove(partition.getKey()); + SiddhiAppContext.startPartitionFlow(partition.getKey()); + try { + for (QueryRuntime queryRuntime : queryRuntimeList) { + Map elementHolderMap = + siddhiAppContext.getSnapshotService().getStateHolderMap( + partitionName, queryRuntime.getQueryId()); + for (StateHolder stateHolder : elementHolderMap.values()) { + stateHolder.cleanGroupByStates(); + } + } + } finally { + SiddhiAppContext.stopPartitionFlow(); + } + } + } + } + } finally { + stateHolder.returnState(state); + } + } + }, purgeExecutionInterval, purgeExecutionInterval, TimeUnit.MILLISECONDS); + } + } + + public Set getPartitionKeys() { + PartitionState state = stateHolder.getState(); + try { + return new HashSet<>(state.partitionKeys.keySet()); + } finally { + stateHolder.returnState(state); + } + } + + @Override + public Collection getQueries() { + return queryRuntimeList; + } + + /** + * State of partition + */ + public class PartitionState extends State { + + private Map partitionKeys = new ConcurrentHashMap<>(); + + @Override + public boolean canDestroy() { + return partitionKeys.isEmpty(); + } + + @Override + public Map snapshot() { + Map state = new HashMap<>(); + state.put("PartitionKeys", partitionKeys); + return state; + } + + @Override + public void restore(Map state) { + partitionKeys = (Map) state.get("PartitionKeys"); + } + } + +} diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionStreamReceiver.java b/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionStreamReceiver.java index 425f707277..f42e645db6 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionStreamReceiver.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionStreamReceiver.java @@ -28,6 +28,7 @@ import io.siddhi.core.event.stream.converter.StreamEventConverterFactory; import io.siddhi.core.partition.executor.PartitionExecutor; import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.query.QueryRuntimeImpl; import io.siddhi.core.query.input.stream.StreamRuntime; import io.siddhi.core.stream.StreamJunction; import io.siddhi.query.api.definition.StreamDefinition; @@ -48,7 +49,7 @@ public class PartitionStreamReceiver implements StreamJunction.Receiver { private MetaStreamEvent metaStreamEvent; private StreamDefinition streamDefinition; private SiddhiAppContext siddhiAppContext; - private PartitionRuntime partitionRuntime; + private PartitionRuntimeImpl partitionRuntime; private List partitionExecutors; private Map streamJunctionMap = new HashMap<>(); @@ -59,7 +60,7 @@ public PartitionStreamReceiver(SiddhiAppContext siddhiAppContext, MetaStreamEven PartitionRuntime partitionRuntime) { this.metaStreamEvent = metaStreamEvent; this.streamDefinition = streamDefinition; - this.partitionRuntime = partitionRuntime; + this.partitionRuntime = (PartitionRuntimeImpl) partitionRuntime; this.partitionExecutors = partitionExecutors; this.siddhiAppContext = siddhiAppContext; this.streamId = streamDefinition.getId(); @@ -298,8 +299,8 @@ public void addStreamJunction(List queryRuntimeList) { streamJunctionMap.put(streamId, streamJunction); } for (QueryRuntime queryRuntime : queryRuntimeList) { - StreamRuntime streamRuntime = queryRuntime.getStreamRuntime(); - for (int i = 0; i < queryRuntime.getInputStreamId().size(); i++) { + StreamRuntime streamRuntime = ((QueryRuntimeImpl) queryRuntime).getStreamRuntime(); + for (int i = 0; i < ((QueryRuntimeImpl) queryRuntime).getInputStreamId().size(); i++) { if ((streamRuntime.getSingleStreamRuntimes().get(i)). getProcessStreamReceiver().getStreamId().equals(streamId)) { streamJunction.subscribe((streamRuntime.getSingleStreamRuntimes().get(i)) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/query/QueryRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/query/QueryRuntime.java index 366f329d20..2c25094157 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/query/QueryRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/query/QueryRuntime.java @@ -17,157 +17,16 @@ */ package io.siddhi.core.query; -import io.siddhi.core.config.SiddhiQueryContext; -import io.siddhi.core.event.MetaComplexEvent; -import io.siddhi.core.query.input.MultiProcessStreamReceiver; -import io.siddhi.core.query.input.stream.StreamRuntime; -import io.siddhi.core.query.input.stream.single.SingleStreamRuntime; -import io.siddhi.core.query.input.stream.state.StateStreamRuntime; -import io.siddhi.core.query.output.callback.OutputCallback; -import io.siddhi.core.query.output.callback.QueryCallback; -import io.siddhi.core.query.output.ratelimit.OutputRateLimiter; -import io.siddhi.core.query.selector.QuerySelector; -import io.siddhi.core.util.extension.holder.ExternalReferencedHolder; -import io.siddhi.core.util.statistics.MemoryCalculable; -import io.siddhi.query.api.definition.StreamDefinition; import io.siddhi.query.api.execution.query.Query; -import io.siddhi.query.api.execution.query.input.stream.JoinInputStream; -import io.siddhi.query.api.execution.query.input.stream.SingleInputStream; -import io.siddhi.query.api.execution.query.input.stream.StateInputStream; - -import java.util.List; /** * Query Runtime represent holder object for a single Siddhi query and holds all runtime objects related to that query. */ -public class QueryRuntime implements MemoryCalculable, ExternalReferencedHolder { - - private StreamRuntime streamRuntime; - private OutputRateLimiter outputRateLimiter; - private Query query; - private OutputCallback outputCallback; - private SiddhiQueryContext siddhiQueryContext; - private StreamDefinition outputStreamDefinition; - private boolean toLocalStream; - private QuerySelector selector; - private MetaComplexEvent metaComplexEvent; - - public QueryRuntime(Query query, StreamRuntime streamRuntime, QuerySelector selector, - OutputRateLimiter outputRateLimiter, OutputCallback outputCallback, - MetaComplexEvent metaComplexEvent, - SiddhiQueryContext siddhiQueryContext) { - this.query = query; - this.streamRuntime = streamRuntime; - this.selector = selector; - this.outputCallback = outputCallback; - this.siddhiQueryContext = siddhiQueryContext; - outputRateLimiter.setOutputCallback(outputCallback); - setOutputRateLimiter(outputRateLimiter); - setMetaComplexEvent(metaComplexEvent); - init(); - } - - public String getQueryId() { - return siddhiQueryContext.getName(); - } - - public void addCallback(QueryCallback callback) { - outputRateLimiter.addQueryCallback(callback); - } - - public OutputRateLimiter getOutputRateManager() { - return outputRateLimiter; - } - - public StreamDefinition getOutputStreamDefinition() { - return outputStreamDefinition; - } - - public List getInputStreamId() { - return query.getInputStream().getAllStreamIds(); - } - - public boolean isToLocalStream() { - return toLocalStream; - } - - public void setToLocalStream(boolean toLocalStream) { - this.toLocalStream = toLocalStream; - } - - public boolean isFromLocalStream() { - if (query.getInputStream() instanceof SingleInputStream) { - return ((SingleInputStream) query.getInputStream()).isInnerStream(); - } else if (query.getInputStream() instanceof JoinInputStream) { - return ((SingleInputStream) ((JoinInputStream) query.getInputStream()).getLeftInputStream()) - .isInnerStream() || ((SingleInputStream) ((JoinInputStream) query.getInputStream()) - .getRightInputStream()).isInnerStream(); - } else if (query.getInputStream() instanceof StateInputStream) { - for (String streamId : query.getInputStream().getAllStreamIds()) { - if (streamId.startsWith("#")) { - return true; - } - } - } - return false; - } - - private void setOutputRateLimiter(OutputRateLimiter outputRateLimiter) { - this.outputRateLimiter = outputRateLimiter; - selector.setNextProcessor(outputRateLimiter); - } - - public SiddhiQueryContext getSiddhiQueryContext() { - return siddhiQueryContext; - } - - public StreamRuntime getStreamRuntime() { - return streamRuntime; - } - - public MetaComplexEvent getMetaComplexEvent() { - return metaComplexEvent; - } - - private void setMetaComplexEvent(MetaComplexEvent metaComplexEvent) { - outputStreamDefinition = metaComplexEvent.getOutputStreamDefinition(); - this.metaComplexEvent = metaComplexEvent; - } - - public Query getQuery() { - return query; - } - - public OutputCallback getOutputCallback() { - return outputCallback; - } - - public void init() { - streamRuntime.setCommonProcessor(selector); - for (SingleStreamRuntime singleStreamRuntime : streamRuntime.getSingleStreamRuntimes()) { - if (singleStreamRuntime.getProcessStreamReceiver() instanceof MultiProcessStreamReceiver) { - ((MultiProcessStreamReceiver) singleStreamRuntime.getProcessStreamReceiver()) - .setOutputRateLimiter(outputRateLimiter); - } - } - } - - public QuerySelector getSelector() { - return selector; - } +public interface QueryRuntime { - public void initPartition() { - if (streamRuntime instanceof StateStreamRuntime) { - ((StateStreamRuntime) streamRuntime).initPartition(); - } - outputRateLimiter.partitionCreated(); - } + String getQueryId(); - public void start() { - initPartition(); - } + boolean isStateful(); - @Override - public void stop() { - } + Query getQuery(); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/query/QueryRuntimeImpl.java b/modules/siddhi-core/src/main/java/io/siddhi/core/query/QueryRuntimeImpl.java new file mode 100644 index 0000000000..1b671a342a --- /dev/null +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/query/QueryRuntimeImpl.java @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.core.query; + +import io.siddhi.core.config.SiddhiQueryContext; +import io.siddhi.core.event.MetaComplexEvent; +import io.siddhi.core.query.input.MultiProcessStreamReceiver; +import io.siddhi.core.query.input.stream.StreamRuntime; +import io.siddhi.core.query.input.stream.single.SingleStreamRuntime; +import io.siddhi.core.query.input.stream.state.StateStreamRuntime; +import io.siddhi.core.query.output.callback.OutputCallback; +import io.siddhi.core.query.output.callback.QueryCallback; +import io.siddhi.core.query.output.ratelimit.OutputRateLimiter; +import io.siddhi.core.query.selector.QuerySelector; +import io.siddhi.core.util.extension.holder.ExternalReferencedHolder; +import io.siddhi.core.util.statistics.MemoryCalculable; +import io.siddhi.query.api.definition.StreamDefinition; +import io.siddhi.query.api.execution.query.Query; +import io.siddhi.query.api.execution.query.input.stream.JoinInputStream; +import io.siddhi.query.api.execution.query.input.stream.SingleInputStream; +import io.siddhi.query.api.execution.query.input.stream.StateInputStream; + +import java.util.List; + +/** + * Query Runtime represent holder object for a single Siddhi query and holds all runtime objects related to that query. + */ +public class QueryRuntimeImpl implements QueryRuntime, MemoryCalculable, ExternalReferencedHolder { + + private StreamRuntime streamRuntime; + private OutputRateLimiter outputRateLimiter; + private Query query; + private OutputCallback outputCallback; + private SiddhiQueryContext siddhiQueryContext; + private StreamDefinition outputStreamDefinition; + private boolean toLocalStream; + private QuerySelector selector; + private MetaComplexEvent metaComplexEvent; + + public QueryRuntimeImpl(Query query, StreamRuntime streamRuntime, QuerySelector selector, + OutputRateLimiter outputRateLimiter, OutputCallback outputCallback, + MetaComplexEvent metaComplexEvent, + SiddhiQueryContext siddhiQueryContext) { + this.query = query; + this.streamRuntime = streamRuntime; + this.selector = selector; + this.outputCallback = outputCallback; + this.siddhiQueryContext = siddhiQueryContext; + outputRateLimiter.setOutputCallback(outputCallback); + setOutputRateLimiter(outputRateLimiter); + setMetaComplexEvent(metaComplexEvent); + init(); + } + + public String getQueryId() { + return siddhiQueryContext.getName(); + } + + public void addCallback(QueryCallback callback) { + outputRateLimiter.addQueryCallback(callback); + } + + public OutputRateLimiter getOutputRateManager() { + return outputRateLimiter; + } + + public StreamDefinition getOutputStreamDefinition() { + return outputStreamDefinition; + } + + public List getInputStreamId() { + return query.getInputStream().getAllStreamIds(); + } + + public boolean isToLocalStream() { + return toLocalStream; + } + + public boolean isStateful() { + return siddhiQueryContext.isStateful(); + } + + public void setToLocalStream(boolean toLocalStream) { + this.toLocalStream = toLocalStream; + } + + public boolean isFromLocalStream() { + if (query.getInputStream() instanceof SingleInputStream) { + return ((SingleInputStream) query.getInputStream()).isInnerStream(); + } else if (query.getInputStream() instanceof JoinInputStream) { + return ((SingleInputStream) ((JoinInputStream) query.getInputStream()).getLeftInputStream()) + .isInnerStream() || ((SingleInputStream) ((JoinInputStream) query.getInputStream()) + .getRightInputStream()).isInnerStream(); + } else if (query.getInputStream() instanceof StateInputStream) { + for (String streamId : query.getInputStream().getAllStreamIds()) { + if (streamId.startsWith("#")) { + return true; + } + } + } + return false; + } + + private void setOutputRateLimiter(OutputRateLimiter outputRateLimiter) { + this.outputRateLimiter = outputRateLimiter; + selector.setNextProcessor(outputRateLimiter); + } + + public SiddhiQueryContext getSiddhiQueryContext() { + return siddhiQueryContext; + } + + public StreamRuntime getStreamRuntime() { + return streamRuntime; + } + + public MetaComplexEvent getMetaComplexEvent() { + return metaComplexEvent; + } + + private void setMetaComplexEvent(MetaComplexEvent metaComplexEvent) { + outputStreamDefinition = metaComplexEvent.getOutputStreamDefinition(); + this.metaComplexEvent = metaComplexEvent; + } + + public Query getQuery() { + return query; + } + + public OutputCallback getOutputCallback() { + return outputCallback; + } + + public void init() { + streamRuntime.setCommonProcessor(selector); + for (SingleStreamRuntime singleStreamRuntime : streamRuntime.getSingleStreamRuntimes()) { + if (singleStreamRuntime.getProcessStreamReceiver() instanceof MultiProcessStreamReceiver) { + ((MultiProcessStreamReceiver) singleStreamRuntime.getProcessStreamReceiver()) + .setOutputRateLimiter(outputRateLimiter); + } + } + } + + public QuerySelector getSelector() { + return selector; + } + + public void initPartition() { + if (streamRuntime instanceof StateStreamRuntime) { + ((StateStreamRuntime) streamRuntime).initPartition(); + } + outputRateLimiter.partitionCreated(); + } + + public void start() { + initPartition(); + } + + @Override + public void stop() { + } +} diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/AbstractStreamProcessor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/AbstractStreamProcessor.java index 006cd89bc6..727dda47ae 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/AbstractStreamProcessor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/AbstractStreamProcessor.java @@ -187,4 +187,7 @@ public void setToLast(Processor processor) { */ public abstract ProcessingMode getProcessingMode(); + public boolean isStateful() { + return siddhiQueryContext.isStateful(); + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/Source.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/Source.java index 053119d944..0d43dd6bcf 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/Source.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/Source.java @@ -25,6 +25,7 @@ import io.siddhi.core.util.ExceptionUtil; import io.siddhi.core.util.StringUtil; import io.siddhi.core.util.config.ConfigReader; +import io.siddhi.core.util.snapshot.state.EmptyStateHolder; import io.siddhi.core.util.snapshot.state.State; import io.siddhi.core.util.snapshot.state.StateFactory; import io.siddhi.core.util.snapshot.state.StateHolder; @@ -232,4 +233,8 @@ public void onError(ConnectionUnavailableException e) { retryWithBackoff(e); } } + + public boolean isStateful() { + return stateHolder != null && !(stateHolder instanceof EmptyStateHolder); + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java index 80ea294f4f..d516825c6a 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java @@ -29,6 +29,7 @@ import io.siddhi.core.util.StringUtil; import io.siddhi.core.util.config.ConfigReader; import io.siddhi.core.util.parser.helper.QueryParserHelper; +import io.siddhi.core.util.snapshot.state.EmptyStateHolder; import io.siddhi.core.util.snapshot.state.State; import io.siddhi.core.util.snapshot.state.StateFactory; import io.siddhi.core.util.snapshot.state.StateHolder; @@ -345,4 +346,8 @@ public enum OnErrorAction { WAIT, STREAM } + + public boolean isStateful() { + return stateHolder != null && !(stateHolder instanceof EmptyStateHolder); + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/table/InMemoryTable.java b/modules/siddhi-core/src/main/java/io/siddhi/core/table/InMemoryTable.java index f5d1101a5d..c27a7d4d31 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/table/InMemoryTable.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/table/InMemoryTable.java @@ -139,8 +139,6 @@ public void updateOrAdd(ComplexEventChunk updateOrAddingEventChunk, } } - - @Override public boolean contains(StateEvent matchingEvent, CompiledCondition compiledCondition) { readWriteLock.readLock().lock(); @@ -220,7 +218,7 @@ public class TableState extends State { public TableState(EventHolder eventHolder) { this.eventHolder = eventHolder; } - + public EventHolder getEventHolder() { return eventHolder; } @@ -246,4 +244,8 @@ public void restore(Map state) { public int size() { return stateHolder.getState().eventHolder.size(); } + + public boolean isStateful() { + return true; + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/table/Table.java b/modules/siddhi-core/src/main/java/io/siddhi/core/table/Table.java index e5f1916c16..c7073cb58b 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/table/Table.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/table/Table.java @@ -498,4 +498,6 @@ public void shutdown() { isConnected.set(false); isTryingToConnect.set(false); } + + public abstract boolean isStateful(); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/table/record/AbstractRecordTable.java b/modules/siddhi-core/src/main/java/io/siddhi/core/table/record/AbstractRecordTable.java index d1d53ce445..e426862a79 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/table/record/AbstractRecordTable.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/table/record/AbstractRecordTable.java @@ -425,4 +425,9 @@ public SiddhiQueryContext getSiddhiQueryContext() { return siddhiQueryContext; } } + + @Override + public boolean isStateful() { + return false; + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/AbstractTrigger.java b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/AbstractTrigger.java new file mode 100644 index 0000000000..d50a29e6de --- /dev/null +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/AbstractTrigger.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.core.trigger; + +import io.siddhi.core.config.SiddhiAppContext; +import io.siddhi.core.stream.StreamJunction; +import io.siddhi.core.util.extension.holder.ExternalReferencedHolder; +import io.siddhi.query.api.definition.TriggerDefinition; + +/** + * Base class for the internal Trigger implementation + */ +public abstract class AbstractTrigger implements ExternalReferencedHolder, Trigger { + + public abstract void init(TriggerDefinition triggerDefinition, SiddhiAppContext siddhiAppContext, StreamJunction + streamJunction); + +} diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/CronTrigger.java b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/CronTrigger.java index 7e02da478c..158c9778e0 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/CronTrigger.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/CronTrigger.java @@ -43,7 +43,7 @@ /** * Implementation of {@link Trigger} which will trigger events based on a cron expression. */ -public class CronTrigger implements Trigger, Job { +public class CronTrigger extends AbstractTrigger implements Job { protected static final Logger LOG = Logger.getLogger(CronTrigger.class); @@ -155,4 +155,9 @@ private void sendEvent() { } streamJunction.sendEvent(new Event(currentTime, new Object[]{currentTime})); } + + @Override + public boolean isStateful() { + return false; + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/PeriodicTrigger.java b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/PeriodicTrigger.java index e58673765b..0365a79219 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/PeriodicTrigger.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/PeriodicTrigger.java @@ -33,7 +33,7 @@ /** * Implementation of {@link Trigger} which will trigger events based on a pre-defined period. */ -public class PeriodicTrigger implements Trigger { +public class PeriodicTrigger extends AbstractTrigger { private TriggerDefinition triggerDefinition; private SiddhiAppContext siddhiAppContext; private StreamJunction streamJunction; @@ -94,4 +94,9 @@ public void stop() { scheduledFuture.cancel(true); } } + + @Override + public boolean isStateful() { + return false; + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/StartTrigger.java b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/StartTrigger.java index 40e662512f..39a1e077bf 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/StartTrigger.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/StartTrigger.java @@ -30,7 +30,7 @@ /** * Implementation of {@link Trigger} which will trigger events when siddhi app in started. */ -public class StartTrigger implements Trigger { +public class StartTrigger extends AbstractTrigger { private TriggerDefinition triggerDefinition; private SiddhiAppContext siddhiAppContext; private StreamJunction streamJunction; @@ -83,4 +83,9 @@ public void start() { public void stop() { } + + @Override + public boolean isStateful() { + return false; + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java index 6abe6ff542..0af69ecee9 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java @@ -18,8 +18,6 @@ package io.siddhi.core.trigger; -import io.siddhi.core.config.SiddhiAppContext; -import io.siddhi.core.stream.StreamJunction; import io.siddhi.core.util.extension.holder.ExternalReferencedHolder; import io.siddhi.query.api.definition.TriggerDefinition; @@ -29,11 +27,9 @@ */ public interface Trigger extends ExternalReferencedHolder { - void init(TriggerDefinition triggerDefinition, SiddhiAppContext siddhiAppContext, StreamJunction - streamJunction); - TriggerDefinition getTriggerDefinition(); String getId(); + boolean isStateful(); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/SiddhiAppRuntimeBuilder.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/SiddhiAppRuntimeBuilder.java index 9fd8376fc4..270d18c5a6 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/SiddhiAppRuntimeBuilder.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/SiddhiAppRuntimeBuilder.java @@ -19,11 +19,14 @@ package io.siddhi.core.util; import io.siddhi.core.SiddhiAppRuntime; +import io.siddhi.core.SiddhiAppRuntimeImpl; import io.siddhi.core.aggregation.AggregationRuntime; import io.siddhi.core.config.SiddhiAppContext; import io.siddhi.core.exception.SiddhiAppCreationException; import io.siddhi.core.partition.PartitionRuntime; +import io.siddhi.core.partition.PartitionRuntimeImpl; import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.query.QueryRuntimeImpl; import io.siddhi.core.query.input.ProcessStreamReceiver; import io.siddhi.core.query.input.stream.StreamRuntime; import io.siddhi.core.query.input.stream.single.SingleStreamRuntime; @@ -161,11 +164,11 @@ public void defineAggregation(AggregationDefinition aggregationDefinition) { aggregationMap.putIfAbsent(aggregationDefinition.getId(), aggregationRuntime); } - public void addPartition(PartitionRuntime partitionRuntime) { + public void addPartition(PartitionRuntimeImpl partitionRuntime) { partitionMap.put(partitionRuntime.getPartitionName(), partitionRuntime); } - public String addQuery(QueryRuntime queryRuntime) { + public String addQuery(QueryRuntimeImpl queryRuntime) { QueryRuntime oldQueryRuntime = queryProcessorMap.put(queryRuntime.getQueryId(), queryRuntime); if (oldQueryRuntime != null) { throw new SiddhiAppCreationException("Multiple queries with name '" + queryRuntime.getQueryId() + @@ -279,9 +282,9 @@ public LockSynchronizer getLockSynchronizer() { } public SiddhiAppRuntime build() { - return new SiddhiAppRuntime(streamDefinitionMap, tableDefinitionMap, windowDefinitionMap, + return new SiddhiAppRuntimeImpl(streamDefinitionMap, tableDefinitionMap, windowDefinitionMap, aggregationDefinitionMap, inputManager, queryProcessorMap, streamJunctionMap, tableMap, windowMap, - aggregationMap, sourceMap, sinkMap, partitionMap, + aggregationMap, sourceMap, sinkMap, partitionMap, triggerMap, siddhiAppContext, siddhiAppRuntimeMap); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/PartitionParser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/PartitionParser.java index 0943e467b6..e2eb06248b 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/PartitionParser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/PartitionParser.java @@ -24,7 +24,8 @@ import io.siddhi.core.exception.SiddhiAppCreationException; import io.siddhi.core.executor.VariableExpressionExecutor; import io.siddhi.core.partition.PartitionRuntime; -import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.partition.PartitionRuntimeImpl; +import io.siddhi.core.query.QueryRuntimeImpl; import io.siddhi.core.util.SiddhiAppRuntimeBuilder; import io.siddhi.core.util.parser.helper.QueryParserHelper; import io.siddhi.query.api.definition.AbstractDefinition; @@ -44,13 +45,13 @@ */ public class PartitionParser { - public static PartitionRuntime parse(SiddhiAppRuntimeBuilder siddhiAppRuntimeBuilder, Partition partition, - SiddhiAppContext siddhiAppContext, int queryIndex, int partitionIndex) { + public static PartitionRuntimeImpl parse(SiddhiAppRuntimeBuilder siddhiAppRuntimeBuilder, Partition partition, + SiddhiAppContext siddhiAppContext, int queryIndex, int partitionIndex) { ConcurrentMap streamDefinitionMap = siddhiAppRuntimeBuilder.getStreamDefinitionMap(); ConcurrentMap windowDefinitionMap = siddhiAppRuntimeBuilder.getWindowDefinitionMap(); - PartitionRuntime partitionRuntime = new PartitionRuntime(streamDefinitionMap, windowDefinitionMap, + PartitionRuntimeImpl partitionRuntime = new PartitionRuntimeImpl(streamDefinitionMap, windowDefinitionMap, siddhiAppRuntimeBuilder.getStreamJunctions(), partition, partitionIndex, siddhiAppContext); validateStreamPartitions(partition.getPartitionTypeMap(), streamDefinitionMap, windowDefinitionMap); for (Query query : partition.getQueryList()) { @@ -60,7 +61,7 @@ public static PartitionRuntime parse(SiddhiAppRuntimeBuilder siddhiAppRuntimeBui combinedStreamMap.putAll(streamDefinitionMap); combinedStreamMap.putAll(windowDefinitionMap); combinedStreamMap.putAll(partitionRuntime.getLocalStreamDefinitionMap()); - QueryRuntime queryRuntime = QueryParser.parse(query, siddhiAppContext, combinedStreamMap, + QueryRuntimeImpl queryRuntime = QueryParser.parse(query, siddhiAppContext, combinedStreamMap, siddhiAppRuntimeBuilder.getTableDefinitionMap(), siddhiAppRuntimeBuilder.getWindowDefinitionMap(), siddhiAppRuntimeBuilder.getAggregationDefinitionMap(), diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/QueryParser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/QueryParser.java index 0cfca87a20..94de89cad3 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/QueryParser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/QueryParser.java @@ -28,6 +28,7 @@ import io.siddhi.core.exception.SiddhiAppCreationException; import io.siddhi.core.executor.VariableExpressionExecutor; import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.query.QueryRuntimeImpl; import io.siddhi.core.query.input.stream.StreamRuntime; import io.siddhi.core.query.input.stream.join.JoinProcessor; import io.siddhi.core.query.input.stream.join.JoinStreamRuntime; @@ -86,7 +87,7 @@ public class QueryParser { * @param partitionId The ID of the partition * @return queryRuntime */ - public static QueryRuntime parse(Query query, SiddhiAppContext siddhiAppContext, + public static QueryRuntimeImpl parse(Query query, SiddhiAppContext siddhiAppContext, Map streamDefinitionMap, Map tableDefinitionMap, Map windowDefinitionMap, @@ -96,7 +97,7 @@ public static QueryRuntime parse(Query query, SiddhiAppContext siddhiAppContext, LockSynchronizer lockSynchronizer, String queryIndex, boolean partitioned, String partitionId) { List executors = new ArrayList(); - QueryRuntime queryRuntime; + QueryRuntimeImpl queryRuntime; Element nameElement = null; LatencyTracker latencyTracker = null; LockWrapper lockWrapper = null; @@ -253,7 +254,7 @@ public static QueryRuntime parse(Query query, SiddhiAppContext siddhiAppContext, selector.setEventPopulator(StateEventPopulatorFactory.constructEventPopulator(streamRuntime .getMetaComplexEvent())); - queryRuntime = new QueryRuntime(query, streamRuntime, selector, outputRateLimiter, outputCallback, + queryRuntime = new QueryRuntimeImpl(query, streamRuntime, selector, outputRateLimiter, outputCallback, streamRuntime.getMetaComplexEvent(), siddhiQueryContext); if (outputRateLimiter instanceof WrappedSnapshotOutputRateLimiter) { diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/SiddhiAppParser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/SiddhiAppParser.java index 79c4c36d67..f04e82bb2f 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/SiddhiAppParser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/SiddhiAppParser.java @@ -21,8 +21,8 @@ import io.siddhi.core.config.SiddhiAppContext; import io.siddhi.core.config.SiddhiContext; import io.siddhi.core.exception.SiddhiAppCreationException; -import io.siddhi.core.partition.PartitionRuntime; -import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.partition.PartitionRuntimeImpl; +import io.siddhi.core.query.QueryRuntimeImpl; import io.siddhi.core.stream.StreamJunction; import io.siddhi.core.util.ExceptionUtil; import io.siddhi.core.util.IdGenerator; @@ -85,6 +85,7 @@ public static SiddhiAppRuntimeBuilder parse(SiddhiApp siddhiApp, String siddhiAp SiddhiAppContext siddhiAppContext = new SiddhiAppContext(); siddhiAppContext.setSiddhiContext(siddhiContext); siddhiAppContext.setSiddhiAppString(siddhiAppString); + siddhiAppContext.setSiddhiApp(siddhiApp); try { Element element = AnnotationHelper.getAnnotationElement(SiddhiConstants.ANNOTATION_NAME, null, @@ -250,7 +251,7 @@ public static SiddhiAppRuntimeBuilder parse(SiddhiApp siddhiApp, String siddhiAp for (ExecutionElement executionElement : siddhiApp.getExecutionElementList()) { if (executionElement instanceof Query) { try { - QueryRuntime queryRuntime = QueryParser.parse((Query) executionElement, siddhiAppContext, + QueryRuntimeImpl queryRuntime = QueryParser.parse((Query) executionElement, siddhiAppContext, siddhiAppRuntimeBuilder.getStreamDefinitionMap(), siddhiAppRuntimeBuilder.getTableDefinitionMap(), siddhiAppRuntimeBuilder.getWindowDefinitionMap(), @@ -269,7 +270,7 @@ public static SiddhiAppRuntimeBuilder parse(SiddhiApp siddhiApp, String siddhiAp } } else { try { - PartitionRuntime partitionRuntime = PartitionParser.parse(siddhiAppRuntimeBuilder, + PartitionRuntimeImpl partitionRuntime = PartitionParser.parse(siddhiAppRuntimeBuilder, (Partition) executionElement, siddhiAppContext, queryIndex, partitionIndex); siddhiAppRuntimeBuilder.addPartition(partitionRuntime); queryIndex += ((Partition) executionElement).getQueryList().size(); diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java index 6669d22378..fd0c320ec1 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java @@ -43,6 +43,7 @@ import io.siddhi.core.table.Table; import io.siddhi.core.table.record.RecordTableHandler; import io.siddhi.core.table.record.RecordTableHandlerManager; +import io.siddhi.core.trigger.AbstractTrigger; import io.siddhi.core.trigger.CronTrigger; import io.siddhi.core.trigger.PeriodicTrigger; import io.siddhi.core.trigger.StartTrigger; @@ -291,7 +292,7 @@ public static void addEventTrigger(TriggerDefinition triggerDefinition, ConcurrentMap streamJunctionMap, SiddhiAppContext siddhiAppContext) { if (!eventTriggerMap.containsKey(triggerDefinition.getId())) { - Trigger trigger; + AbstractTrigger trigger; if (triggerDefinition.getAtEvery() != null) { trigger = new PeriodicTrigger(); } else if (triggerDefinition.getAt().trim().equalsIgnoreCase(SiddhiConstants.TRIGGER_START)) { diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/window/Window.java b/modules/siddhi-core/src/main/java/io/siddhi/core/window/Window.java index d87109a2b2..7dbbc47985 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/window/Window.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/window/Window.java @@ -364,4 +364,8 @@ public void setToLast(Processor processor) { } } + + public boolean isStateful() { + return internalWindowProcessor.isStateful(); + } } diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SnapshotableEventQueueTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SnapshotableEventQueueTestCase.java index f154ead5b7..670aa3a740 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SnapshotableEventQueueTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SnapshotableEventQueueTestCase.java @@ -139,7 +139,7 @@ public void incrementalPersistenceTest1() throws InterruptedException, IOExcepti } Snapshot snapshot4 = snapshotableStreamEventQueue.getSnapshot(); - streamEvents = (StreamEvent) snapshot1.getState(); + streamEvents = (StreamEvent) snapshot4.getState(); Assert.assertTrue(streamEvents != null); snapshots = new HashMap<>(); snapshots.put(6L, toString(snapshot4)); @@ -151,7 +151,7 @@ public void incrementalPersistenceTest1() throws InterruptedException, IOExcepti snapshotStateList.putSnapshotState(entry.getKey(), (Snapshot) fromString(entry.getValue())); } snapshotableStreamEventQueue3.restore(snapshotStateList); - + snapshotableStreamEventQueue.reset(); Assert.assertEquals(snapshotableStreamEventQueue, snapshotableStreamEventQueue3); } diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/StateTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/StateTestCase.java new file mode 100644 index 0000000000..6f4f1fd9f6 --- /dev/null +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/StateTestCase.java @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.core.managment; + +import io.siddhi.core.SiddhiAppRuntime; +import io.siddhi.core.SiddhiManager; +import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.table.Table; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class StateTestCase { + private static final Logger log = Logger.getLogger(StateTestCase.class); + private int inEventCount; + private int removeEventCount; + private boolean eventArrived; + + @BeforeMethod + public void init() { + inEventCount = 0; + removeEventCount = 0; + eventArrived = false; + } + + @Test + public void stateTest1() throws InterruptedException { + log.info("stateTest1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String app = "" + + "define stream cseEventStream (symbol string, price float, volume int);" + + "" + + "@info(name = 'query1') " + + "from cseEventStream " + + "select * " + + "insert all events into outputStream;" + + "" + + "@info(name = 'query2') " + + "from cseEventStream#window.timeBatch(1 sec) " + + "select * " + + "insert all events into outputStream;" + + "" + + "@info(name = 'query3') " + + "from cseEventStream " + + "select sum(price) as total " + + "insert all events into outputStream1;" + + "" + + "@info(name = 'query4') " + + "from cseEventStream " + + "select * " + + "output every 5 min " + + "insert all events into outputStream;" + + "" + + ""; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + + for (QueryRuntime queryRuntime : siddhiAppRuntime.getQueries()) { + inEventCount++; + switch (inEventCount) { + case 1: + Assert.assertFalse(queryRuntime.isStateful()); + break; + case 2: + Assert.assertTrue(queryRuntime.isStateful()); + break; + case 3: + Assert.assertTrue(queryRuntime.isStateful()); + break; + case 4: + Assert.assertTrue(queryRuntime.isStateful()); + break; + } + } + Assert.assertEquals(inEventCount, 4); + siddhiAppRuntime.shutdown(); + + } + + @Test + public void stateTest2() throws InterruptedException { + log.info("stateTest2"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String app = "" + + "define stream cseEventStream (symbol string, price float, volume int);" + + "" + + "partition with (symbol of cseEventStream) " + + "begin " + + "" + + " @info(name = 'query1') " + + " from cseEventStream " + + " select * " + + " insert all events into outputStream;" + + "" + + " @info(name = 'query2') " + + " from cseEventStream#window.timeBatch(1 sec) " + + " select * " + + " insert all events into outputStream;" + + "" + + " @info(name = 'query3') " + + " from cseEventStream " + + " select sum(price) as total " + + " insert all events into outputStream1;" + + "" + + " @info(name = 'query4') " + + " from cseEventStream " + + " select * " + + " output every 5 min " + + " insert all events into outputStream;" + + "" + + "end " + + ""; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + + for (QueryRuntime queryRuntime : siddhiAppRuntime.getPartitions().iterator().next().getQueries()) { + inEventCount++; + switch (inEventCount) { + case 1: + Assert.assertFalse(queryRuntime.isStateful()); + break; + case 2: + Assert.assertTrue(queryRuntime.isStateful()); + break; + case 3: + Assert.assertTrue(queryRuntime.isStateful()); + break; + case 4: + Assert.assertTrue(queryRuntime.isStateful()); + break; + } + } + Assert.assertEquals(inEventCount, 4); + siddhiAppRuntime.shutdown(); + } + + @Test + public void stateTest3() throws InterruptedException { + log.info("stateTest3"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String app = "" + + "" + + "define table cseEventTable (symbol string, price float, volume int);" + + "" + + "@Store(type='testStoreContainingInMemoryTable') " + + "define table StockTable (symbol string, price float, volume long); " + + "" + + "define window cseEventWindow (symbol string, price float, volume int) time(1 sec) " + + " output all events; " + + "" + + "define trigger triggerStream at every 500 milliseconds ;" + + "" + + "define stream cseEventStream (symbol string, price float, volume int);" + + "define stream twitterStream (user string, tweet string, company string); " + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); " + + "" + + "@info(name = 'query1') " + + "from cseEventStream#window.time(1 sec) join twitterStream#window.time(1 sec) " + + "on cseEventStream.symbol== twitterStream.company " + + "select cseEventStream.symbol as symbol, twitterStream.tweet, cseEventStream.price " + + "insert all events into outputStream ;" + + "" + + "@info(name = 'query2') " + + "from every ( e1=Stream1[price > 20] -> e2=Stream2[price > e1.price] " + + " or e3=Stream2['IBM' == symbol]) -> e4=Stream2[price > e1.price] " + + "select e1.price as price1, e2.price as price2, e3.price as price3, e4.price as price4 " + + "insert into OutputStream ;" + + ""; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + + for (QueryRuntime queryRuntime : siddhiAppRuntime.getQueries()) { + inEventCount++; + switch (inEventCount) { + case 1: + Assert.assertTrue(queryRuntime.isStateful()); + break; + case 2: + Assert.assertTrue(queryRuntime.isStateful()); + break; + } + } + + for (Table table : siddhiAppRuntime.getTables()) { + inEventCount++; + switch (inEventCount) { + case 3: + Assert.assertTrue(table.isStateful()); + break; + case 4: + Assert.assertFalse(table.isStateful()); + break; + } + } + + inEventCount++; + Assert.assertTrue(siddhiAppRuntime.getWindows().iterator().next().isStateful()); + + inEventCount++; + Assert.assertFalse(siddhiAppRuntime.getTiggers().iterator().next().isStateful()); + + Assert.assertEquals(inEventCount, 6); + siddhiAppRuntime.shutdown(); + + } +} + diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/stream/event/EventTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/stream/event/EventTestCase.java index b4ae47cfa8..6fbcbb2a91 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/stream/event/EventTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/stream/event/EventTestCase.java @@ -38,7 +38,7 @@ import io.siddhi.core.executor.condition.compare.greaterthan.GreaterThanCompareConditionExpressionExecutorIntInt; import io.siddhi.core.executor.condition.compare.lessthan.LessThanCompareConditionExpressionExecutorFloatFloat; import io.siddhi.core.executor.math.add.AddExpressionExecutorFloat; -import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.query.QueryRuntimeImpl; import io.siddhi.core.query.input.stream.single.SingleStreamRuntime; import io.siddhi.core.stream.input.source.Source; import io.siddhi.core.stream.output.sink.Sink; @@ -277,7 +277,7 @@ public void testQueryParser() { context.setSiddhiContext(siddhicontext); context.setIdGenerator(new IdGenerator()); context.setSnapshotService(new SnapshotService(context)); - QueryRuntime runtime = QueryParser.parse(query, context, streamDefinitionMap, tableDefinitionMap, + QueryRuntimeImpl runtime = QueryParser.parse(query, context, streamDefinitionMap, tableDefinitionMap, windowDefinitionMap, aggregationDefinitionMap, tableMap, aggregationMap, eventWindowMap, lockSynchronizer, "1", false, SiddhiConstants.PARTITION_ID_DEFAULT); AssertJUnit.assertNotNull(runtime); diff --git a/modules/siddhi-core/src/test/resources/testng.xml b/modules/siddhi-core/src/test/resources/testng.xml index f08b4a18e6..e7be813521 100644 --- a/modules/siddhi-core/src/test/resources/testng.xml +++ b/modules/siddhi-core/src/test/resources/testng.xml @@ -33,12 +33,15 @@ - + + + +