From d54bbf9aac389fa909714f3fb9ea462ead0895a4 Mon Sep 17 00:00:00 2001 From: Chiran Date: Thu, 10 Oct 2019 12:48:36 +0530 Subject: [PATCH 1/3] Start Triggers after the system has started --- .../java/io/siddhi/core/SiddhiAppRuntimeImpl.java | 5 ++++- .../io/siddhi/core/config/SiddhiAppContext.java | 11 +++++++++++ .../core/stream/output/sink/InMemorySink.java | 4 ++-- .../siddhi/core/stream/output/sink/LogSink.java | 4 ++-- .../io/siddhi/core/stream/output/sink/Sink.java | 4 ++-- .../main/java/io/siddhi/core/trigger/Trigger.java | 15 +++++++++++++-- .../parser/helper/DefinitionParserHelper.java | 2 +- 7 files changed, 35 insertions(+), 10 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java index 1492695eca..753ed52990 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiAppRuntimeImpl.java @@ -462,7 +462,10 @@ public synchronized void startWithoutSources() { aggregationRuntime.startPurging(); } } - + for (Trigger trigger : + siddhiAppContext.getTriggerHolders()) { + trigger.start(); + } inputManager.connect(); runningWithoutSources = true; diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java b/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java index 7076bb83e1..b269e2043c 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/config/SiddhiAppContext.java @@ -20,6 +20,7 @@ import com.lmax.disruptor.ExceptionHandler; import io.siddhi.core.function.Script; +import io.siddhi.core.trigger.Trigger; import io.siddhi.core.util.IdGenerator; import io.siddhi.core.util.Scheduler; import io.siddhi.core.util.SiddhiConstants; @@ -61,6 +62,7 @@ public class SiddhiAppContext { private ExecutorService executorService; private ScheduledExecutorService scheduledExecutorService; private List externalReferencedHolders; + private List triggerHolders; private SnapshotService snapshotService; private ThreadBarrier threadBarrier = null; @@ -80,6 +82,7 @@ public class SiddhiAppContext { public SiddhiAppContext() { this.externalReferencedHolders = Collections.synchronizedList(new LinkedList<>()); + this.triggerHolders = Collections.synchronizedList(new LinkedList<>()); this.scriptFunctionMap = new HashMap(); this.schedulerList = new ArrayList(); this.rootMetricsLevel = Level.OFF; @@ -186,6 +189,14 @@ public List getExternalReferencedHolders() { return Collections.unmodifiableList(new ArrayList<>(externalReferencedHolders)); } + public List getTriggerHolders() { + return Collections.unmodifiableList(new ArrayList<>(triggerHolders)); + } + + public void addTrigger(Trigger trigger) { + triggerHolders.add(trigger); + } + public ThreadBarrier getThreadBarrier() { return threadBarrier; } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/InMemorySink.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/InMemorySink.java index 5a303a8e9a..dd98dbd873 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/InMemorySink.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/InMemorySink.java @@ -90,12 +90,12 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, Opti @Override public void connect() throws ConnectionUnavailableException { - // do nothing + super.setConnected(true); } @Override public void disconnect() { - // do nothing + super.setConnected(false); } @Override diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/LogSink.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/LogSink.java index 9c2090f0d4..2f21cedb8a 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/LogSink.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/LogSink.java @@ -169,12 +169,12 @@ private void logMessage(LogSink.LogPriority logPriority, String message) { @Override public void connect() throws ConnectionUnavailableException { - // do nothing + super.setConnected(true); } @Override public void disconnect() { - // do nothing + super.setConnected(false); } @Override diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java index fbe07c10d4..f93a9b90e4 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java @@ -212,8 +212,8 @@ public final void publish(Object payload) { private void connectAndPublish(Object payload, DynamicOptions dynamicOptions, S state) throws ConnectionUnavailableException { connect(); - publish(payload, dynamicOptions, state); setConnected(true); + publish(payload, dynamicOptions, state); if (connectionCallback != null) { connectionCallback.connectionEstablished(); } @@ -368,7 +368,7 @@ public void onError(Object payload, DynamicOptions dynamicOptions, Exception e) try { switch (errorAction) { case STREAM: - connectWithRetry(); + connectWithRetry(); streamJunction.handleError(dynamicOptions.getEvent(), e); break; case WAIT: diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java index 0af69ecee9..879f299834 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/trigger/Trigger.java @@ -18,14 +18,25 @@ package io.siddhi.core.trigger; -import io.siddhi.core.util.extension.holder.ExternalReferencedHolder; import io.siddhi.query.api.definition.TriggerDefinition; /** * Interface class to represent event triggers. Event triggers are used to trigger events within Siddhi itself * according to a user given criteria. */ -public interface Trigger extends ExternalReferencedHolder { +public interface Trigger { + + /** + * This will be called only once. + * This will be called after initializing the system. + */ + void start(); + + /** + * This will be called only once. + * This will be called before shutting down the system. + */ + void stop(); TriggerDefinition getTriggerDefinition(); diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java index 27c180a932..3538eb2c37 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/helper/DefinitionParserHelper.java @@ -302,7 +302,7 @@ public static void addEventTrigger(TriggerDefinition triggerDefinition, } StreamJunction streamJunction = streamJunctionMap.get(triggerDefinition.getId()); trigger.init(triggerDefinition, siddhiAppContext, streamJunction); - siddhiAppContext.addEternalReferencedHolder(trigger); + siddhiAppContext.addTrigger(trigger); eventTriggerMap.putIfAbsent(trigger.getId(), trigger); } } From fc8d55c2d4c1385319ac841f9107d7260d99c966 Mon Sep 17 00:00:00 2001 From: Chiran Date: Thu, 10 Oct 2019 12:53:40 +0530 Subject: [PATCH 2/3] Revert setting connected bool to true in connect() --- .../java/io/siddhi/core/stream/output/sink/InMemorySink.java | 4 ++-- .../main/java/io/siddhi/core/stream/output/sink/LogSink.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/InMemorySink.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/InMemorySink.java index dd98dbd873..5a303a8e9a 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/InMemorySink.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/InMemorySink.java @@ -90,12 +90,12 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, Opti @Override public void connect() throws ConnectionUnavailableException { - super.setConnected(true); + // do nothing } @Override public void disconnect() { - super.setConnected(false); + // do nothing } @Override diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/LogSink.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/LogSink.java index 2f21cedb8a..9c2090f0d4 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/LogSink.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/LogSink.java @@ -169,12 +169,12 @@ private void logMessage(LogSink.LogPriority logPriority, String message) { @Override public void connect() throws ConnectionUnavailableException { - super.setConnected(true); + // do nothing } @Override public void disconnect() { - super.setConnected(false); + // do nothing } @Override From 24a0103179e5791bd7a6ce22c3bc74c2ec72b70d Mon Sep 17 00:00:00 2001 From: Chiran Date: Thu, 10 Oct 2019 12:55:23 +0530 Subject: [PATCH 3/3] Revert formatting changes --- .../src/main/java/io/siddhi/core/stream/output/sink/Sink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java index f93a9b90e4..f785b5f914 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/output/sink/Sink.java @@ -368,7 +368,7 @@ public void onError(Object payload, DynamicOptions dynamicOptions, Exception e) try { switch (errorAction) { case STREAM: - connectWithRetry(); + connectWithRetry(); streamJunction.handleError(dynamicOptions.getEvent(), e); break; case WAIT: