diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java index 1492695eca..753ed52990 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java @@ -462,7 +462,10 @@ public synchronized void startWithoutSources() { aggregationRuntime.startPurging(); } } - + for (Trigger trigger : + siddhiAppContext.getTriggerHolders()) { + trigger.start(); + } inputManager.connect(); runningWithoutSources = true; diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java b/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java index 7076bb83e1..b269e2043c 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java @@ -20,6 +20,7 @@ import com.lmax.disruptor.ExceptionHandler; import io.siddhi.core.function.Script; +import io.siddhi.core.trigger.Trigger; import io.siddhi.core.util.IdGenerator; import io.siddhi.core.util.Scheduler; import io.siddhi.core.util.SiddhiConstants; @@ -61,6 +62,7 @@ public class SiddhiAppContext { private ExecutorService executorService; private ScheduledExecutorService scheduledExecutorService; private List externalReferencedHolders; + private List triggerHolders; private SnapshotService snapshotService; private ThreadBarrier threadBarrier = null; @@ -80,6 +82,7 @@ public class SiddhiAppContext { public SiddhiAppContext() { this.externalReferencedHolders = Collections.synchronizedList(new LinkedList<>()); + this.triggerHolders = Collections.synchronizedList(new LinkedList<>()); this.scriptFunctionMap = new HashMap(); this.schedulerList = new ArrayList(); this.rootMetricsLevel = Level.OFF; @@ -186,6 +189,14 @@ public List getExternalReferencedHolders() { return Collections.unmodifiableList(new ArrayList<>(externalReferencedHolders)); } + public List getTriggerHolders() { + return Collections.unmodifiableList(new ArrayList<>(triggerHolders)); + } + + public void addTrigger(Trigger trigger) { + triggerHolders.add(trigger); + } + public ThreadBarrier getThreadBarrier() { return threadBarrier; } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java index fbe07c10d4..f785b5f914 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java @@ -212,8 +212,8 @@ public final void publish(Object payload) { private void connectAndPublish(Object payload, DynamicOptions dynamicOptions, S state) throws ConnectionUnavailableException { connect(); - publish(payload, dynamicOptions, state); setConnected(true); + publish(payload, dynamicOptions, state); if (connectionCallback != null) { connectionCallback.connectionEstablished(); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java index 0af69ecee9..879f299834 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java @@ -18,14 +18,25 @@ package io.siddhi.core.trigger; -import io.siddhi.core.util.extension.holder.ExternalReferencedHolder; import io.siddhi.query.api.definition.TriggerDefinition; /** * Interface class to represent event triggers. Event triggers are used to trigger events within Siddhi itself * according to a user given criteria. */ -public interface Trigger extends ExternalReferencedHolder { +public interface Trigger { + + /** + * This will be called only once. + * This will be called after initializing the system. + */ + void start(); + + /** + * This will be called only once. + * This will be called before shutting down the system. + */ + void stop(); TriggerDefinition getTriggerDefinition(); diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java index 27c180a932..3538eb2c37 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java @@ -302,7 +302,7 @@ public static void addEventTrigger(TriggerDefinition triggerDefinition, } StreamJunction streamJunction = streamJunctionMap.get(triggerDefinition.getId()); trigger.init(triggerDefinition, siddhiAppContext, streamJunction); - siddhiAppContext.addEternalReferencedHolder(trigger); + siddhiAppContext.addTrigger(trigger); eventTriggerMap.putIfAbsent(trigger.getId(), trigger); } }