Skip to content

Commit

Permalink
Merge pull request #1543 from pcnfernando/master
Browse files Browse the repository at this point in the history
Avoid triggers to start before other elements
  • Loading branch information
mohanvive authored Oct 10, 2019
2 parents 2f0e800 + 24a0103 commit 004ae9c
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,10 @@ public synchronized void startWithoutSources() {
aggregationRuntime.startPurging();
}
}

for (Trigger trigger :
siddhiAppContext.getTriggerHolders()) {
trigger.start();
}
inputManager.connect();

runningWithoutSources = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.lmax.disruptor.ExceptionHandler;
import io.siddhi.core.function.Script;
import io.siddhi.core.trigger.Trigger;
import io.siddhi.core.util.IdGenerator;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.SiddhiConstants;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class SiddhiAppContext {
private ExecutorService executorService;
private ScheduledExecutorService scheduledExecutorService;
private List<ExternalReferencedHolder> externalReferencedHolders;
private List<Trigger> triggerHolders;
private SnapshotService snapshotService;

private ThreadBarrier threadBarrier = null;
Expand All @@ -80,6 +82,7 @@ public class SiddhiAppContext {

public SiddhiAppContext() {
this.externalReferencedHolders = Collections.synchronizedList(new LinkedList<>());
this.triggerHolders = Collections.synchronizedList(new LinkedList<>());
this.scriptFunctionMap = new HashMap<String, Script>();
this.schedulerList = new ArrayList<Scheduler>();
this.rootMetricsLevel = Level.OFF;
Expand Down Expand Up @@ -186,6 +189,14 @@ public List<ExternalReferencedHolder> getExternalReferencedHolders() {
return Collections.unmodifiableList(new ArrayList<>(externalReferencedHolders));
}

public List<Trigger> getTriggerHolders() {
return Collections.unmodifiableList(new ArrayList<>(triggerHolders));
}

public void addTrigger(Trigger trigger) {
triggerHolders.add(trigger);
}

public ThreadBarrier getThreadBarrier() {
return threadBarrier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ public final void publish(Object payload) {
private void connectAndPublish(Object payload, DynamicOptions dynamicOptions, S state)
throws ConnectionUnavailableException {
connect();
publish(payload, dynamicOptions, state);
setConnected(true);
publish(payload, dynamicOptions, state);
if (connectionCallback != null) {
connectionCallback.connectionEstablished();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,25 @@

package io.siddhi.core.trigger;

import io.siddhi.core.util.extension.holder.ExternalReferencedHolder;
import io.siddhi.query.api.definition.TriggerDefinition;

/**
* Interface class to represent event triggers. Event triggers are used to trigger events within Siddhi itself
* according to a user given criteria.
*/
public interface Trigger extends ExternalReferencedHolder {
public interface Trigger {

/**
* This will be called only once.
* This will be called after initializing the system.
*/
void start();

/**
* This will be called only once.
* This will be called before shutting down the system.
*/
void stop();

TriggerDefinition getTriggerDefinition();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public static void addEventTrigger(TriggerDefinition triggerDefinition,
}
StreamJunction streamJunction = streamJunctionMap.get(triggerDefinition.getId());
trigger.init(triggerDefinition, siddhiAppContext, streamJunction);
siddhiAppContext.addEternalReferencedHolder(trigger);
siddhiAppContext.addTrigger(trigger);
eventTriggerMap.putIfAbsent(trigger.getId(), trigger);
}
}
Expand Down

0 comments on commit 004ae9c

Please sign in to comment.