Skip to content

Commit

Permalink
Support every containing multiple patterns elements with within
Browse files Browse the repository at this point in the history
Removes duplicate events and supports patterns expiry.
Fixes siddhi-io#1425
Fixes siddhi-io#1182
  • Loading branch information
suhothayan committed Aug 6, 2019
1 parent 8a2c150 commit b738a15
Show file tree
Hide file tree
Showing 20 changed files with 909 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void receive(ComplexEvent complexEvent) {
while (aComplexEvent != null) {
try {
multiProcessReturn.set(new ReturnEventHolder());
stabilizeStates();
stabilizeStates(aComplexEvent.getTimestamp());
for (int anEventSequence : eventSequence) {
StreamEventConverter aStreamEventConverter = streamEventConverters[anEventSequence];
StreamEventFactory aStreamEventFactory = streamEventFactorys[anEventSequence];
Expand Down Expand Up @@ -130,7 +130,7 @@ public void receive(Event event) {
try {
multiProcessReturn.set(new ReturnEventHolder());

stabilizeStates();
stabilizeStates(event.getTimestamp());
for (int anEventSequence : eventSequence) {
StreamEventConverter aStreamEventConverter = streamEventConverters[anEventSequence];
StreamEventFactory aStreamEventFactory = streamEventFactorys[anEventSequence];
Expand Down Expand Up @@ -159,7 +159,7 @@ public void receive(Event[] events) {
for (Event event : events) {
try {
multiProcessReturn.set(new ReturnEventHolder());
stabilizeStates();
stabilizeStates(event.getTimestamp());
for (int anEventSequence : eventSequence) {
StreamEventConverter aStreamEventConverter = streamEventConverters[anEventSequence];
StreamEventFactory aStreamEventFactory = streamEventFactorys[anEventSequence];
Expand Down Expand Up @@ -189,7 +189,7 @@ public void receive(List<Event> events) {
for (Event event : events) {
try {
multiProcessReturn.set(new ReturnEventHolder());
stabilizeStates();
stabilizeStates(event.getTimestamp());
for (int anEventSequence : eventSequence) {
StreamEventConverter aStreamEventConverter = streamEventConverters[anEventSequence];
StreamEventFactory aStreamEventFactory = streamEventFactorys[anEventSequence];
Expand Down Expand Up @@ -218,7 +218,7 @@ public void receive(long timestamp, Object[] data) {
synchronized (patternSyncObject) {
try {
multiProcessReturn.set(new ReturnEventHolder());
stabilizeStates();
stabilizeStates(timestamp);
for (int anEventSequence : eventSequence) {
StreamEventConverter aStreamEventConverter = streamEventConverters[anEventSequence];
StreamEventFactory aStreamEventFactory = streamEventFactorys[anEventSequence];
Expand Down Expand Up @@ -246,7 +246,7 @@ protected void processAndClear(int processIndex, StreamEvent streamEvent) {
nextProcessors[processIndex].process(currentStreamEventChunk);
}

protected void stabilizeStates() {
protected void stabilizeStates(long timestamp) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ public class ProcessStreamReceiver implements StreamJunction.Receiver {
protected final SiddhiQueryContext siddhiQueryContext;
protected String streamId;
protected Processor next;
protected List<PreStateProcessor> stateProcessors = new ArrayList<PreStateProcessor>();
protected int stateProcessorsSize;
protected List<PreStateProcessor> stateProcessorsForStream = new ArrayList<PreStateProcessor>();
protected int stateProcessorsForStreamSize;
protected LockWrapper lockWrapper;
protected boolean batchProcessingAllowed;
private StreamEventConverter streamEventConverter;
private MetaStreamEvent metaStreamEvent;
private StreamEventFactory streamEventFactory;
private SiddhiDebugger siddhiDebugger;
protected List<PreStateProcessor> allStateProcessors = new ArrayList<PreStateProcessor>();
protected int allStateProcessorsSize;

public ProcessStreamReceiver(String streamId,
SiddhiQueryContext siddhiQueryContext) {
Expand Down Expand Up @@ -211,8 +213,14 @@ public void init() {
streamEventConverter = StreamEventConverterFactory.constructEventConverter(metaStreamEvent);
}

public void addStatefulProcessor(PreStateProcessor stateProcessor) {
stateProcessors.add(stateProcessor);
stateProcessorsSize = stateProcessors.size();
public void addStatefulProcessorForStream(PreStateProcessor stateProcessor) {
stateProcessorsForStream.add(stateProcessor);
stateProcessorsForStreamSize = stateProcessorsForStream.size();
}

public void setAllStatefulProcessors(List<PreStateProcessor> allStateProcessors) {
this.allStateProcessors = allStateProcessors;
this.allStateProcessorsSize = allStateProcessors.size();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected void processAndClear(ComplexEventChunk<StreamEvent> streamEventChunk)
while (streamEventChunk.hasNext()) {
StreamEvent streamEvent = streamEventChunk.next();
streamEventChunk.remove();
stabilizeStates();
stabilizeStates(streamEvent.getTimestamp());
currentStreamEventChunk.add(streamEvent);
ComplexEventChunk<StateEvent> eventChunk = ((StreamPreStateProcessor) next).
processAndReturn(currentStreamEventChunk);
Expand All @@ -72,7 +72,7 @@ protected void processAndClear(ComplexEventChunk<StreamEvent> streamEventChunk)
}
}

protected void stabilizeStates() {
protected void stabilizeStates(long timestamp) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.siddhi.core.query.input.stream.state;

import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
Expand Down Expand Up @@ -98,6 +99,7 @@ protected void addState(StateEvent stateEvent, StreamPreState state) {
public void addEveryState(StateEvent stateEvent) {

StateEvent clonedEvent = stateEventCloner.copyStateEvent(stateEvent);
clonedEvent.setType(ComplexEvent.Type.CURRENT);
if (clonedEvent.getStreamEvent(stateId) != null) {
// Set the timestamp of the last arrived event
clonedEvent.setTimestamp(clonedEvent.getStreamEvent(stateId).getTimestamp());
Expand Down Expand Up @@ -259,15 +261,9 @@ public ComplexEventChunk<StateEvent> processAndReturn(ComplexEventChunk complexE
StreamEvent streamEvent = (StreamEvent) complexEventChunk.next(); //Sure only one will be sent
this.lock.lock();
try {
StateEvent expiredStateEvent = null;
for (Iterator<StateEvent> iterator = state.getPendingStateEventList().iterator();
iterator.hasNext(); ) {
StateEvent stateEvent = iterator.next();
if (isExpired(stateEvent, streamEvent.getTimestamp())) {
iterator.remove();
expiredStateEvent = stateEvent;
continue;
}
if (logicalType == LogicalStateElement.Type.OR &&
stateEvent.getStreamEvent(partnerStatePreProcessor.getStateId()) != null) {
iterator.remove();
Expand Down Expand Up @@ -302,10 +298,6 @@ public ComplexEventChunk<StateEvent> processAndReturn(ComplexEventChunk complexE
}
}
}
if (expiredStateEvent != null && withinEveryPreStateProcessor != null) {
withinEveryPreStateProcessor.addEveryState(expiredStateEvent);
withinEveryPreStateProcessor.updateState();
}
} finally {
this.lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.siddhi.core.query.input.stream.state;

import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.util.Scheduler;
Expand Down Expand Up @@ -106,6 +107,10 @@ public void addEveryState(StateEvent stateEvent) {
lock.lock();
try {
StateEvent clonedEvent = stateEventCloner.copyStateEvent(stateEvent);
clonedEvent.setType(ComplexEvent.Type.CURRENT);
for (int i = stateId; i < clonedEvent.getStreamEvents().length; i++) {
clonedEvent.setEvent(i, null);
}
state.getNewAndEveryStateEventList().add(clonedEvent);
// Start the scheduler
state.lastScheduledTime = stateEvent.getTimestamp() + waitingTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.siddhi.core.query.input.stream.state;

import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
Expand Down Expand Up @@ -55,15 +56,9 @@ public ComplexEventChunk<StateEvent> processAndReturn(ComplexEventChunk complexE
StreamEvent streamEvent = (StreamEvent) complexEventChunk.next(); //Sure only one will be sent
CountStreamPreState state = (CountStreamPreState) stateHolder.getState();
lock.lock();
StateEvent expiredStateEvent = null;
try {
for (Iterator<StateEvent> iterator = state.getPendingStateEventList().iterator(); iterator.hasNext(); ) {
StateEvent stateEvent = iterator.next();
if (isExpired(stateEvent, streamEvent.getTimestamp())) {
iterator.remove();
expiredStateEvent = stateEvent;
continue;
}
if (removeIfNextStateProcessed(stateEvent, iterator, stateId + 1)) {
continue;
}
Expand Down Expand Up @@ -92,10 +87,6 @@ public ComplexEventChunk<StateEvent> processAndReturn(ComplexEventChunk complexE
}
}
}
if (expiredStateEvent != null && withinEveryPreStateProcessor != null) {
withinEveryPreStateProcessor.addEveryState(expiredStateEvent);
withinEveryPreStateProcessor.updateState();
}
} finally {
lock.unlock();
stateHolder.returnState(state);
Expand Down Expand Up @@ -151,9 +142,12 @@ public void addEveryState(StateEvent stateEvent) {
lock.lock();
try {
StateEvent clonedEvent = stateEventCloner.copyStateEvent(stateEvent);
clonedEvent.setType(ComplexEvent.Type.CURRENT);
for (int i = stateId; i < clonedEvent.getStreamEvents().length; i++) {
clonedEvent.setEvent(i, null);
}
StreamPreState state = stateHolder.getState();
try {
clonedEvent.setEvent(stateId, null);
state.getNewAndEveryStateEventList().add(clonedEvent);
} finally {
stateHolder.returnState(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.siddhi.core.query.input.stream.state;

import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
Expand Down Expand Up @@ -63,7 +64,11 @@ protected void addState(StateEvent stateEvent, StreamPreState state) {
@Override
public void addEveryState(StateEvent stateEvent) {
StateEvent clonedEvent = stateEventCloner.copyStateEvent(stateEvent);
clonedEvent.setType(ComplexEvent.Type.CURRENT);
clonedEvent.setEvent(stateId, null);
for (int i = stateId; i < clonedEvent.getStreamEvents().length; i++) {
clonedEvent.setEvent(i, null);
}
StreamPreState state = stateHolder.getState();
lock.lock();
try {
Expand Down Expand Up @@ -109,6 +114,7 @@ public void updateState() {
StreamPreState state = stateHolder.getState();
lock.lock();
try {
state.getNewAndEveryStateEventList().sort(eventTimeComparator);
state.getPendingStateEventList().addAll(state.getNewAndEveryStateEventList());
state.getNewAndEveryStateEventList().clear();
partnerStatePreProcessor.moveAllNewAndEveryStateEventListEventsToPendingStateEventList();
Expand All @@ -126,14 +132,8 @@ public ComplexEventChunk<StateEvent> processAndReturn(ComplexEventChunk complexE
StreamPreState state = stateHolder.getState();
lock.lock();
try {
StateEvent expiredStateEvent = null;
for (Iterator<StateEvent> iterator = state.getPendingStateEventList().iterator(); iterator.hasNext(); ) {
StateEvent stateEvent = iterator.next();
if (isExpired(stateEvent, streamEvent.getTimestamp())) {
iterator.remove();
expiredStateEvent = stateEvent;
continue;
}
if (logicalType == LogicalStateElement.Type.OR &&
stateEvent.getStreamEvent(partnerStatePreProcessor.getStateId()) != null) {
iterator.remove();
Expand All @@ -159,10 +159,6 @@ public ComplexEventChunk<StateEvent> processAndReturn(ComplexEventChunk complexE
}
}
}
if (expiredStateEvent != null && withinEveryPreStateProcessor != null) {
withinEveryPreStateProcessor.addEveryState(expiredStateEvent);
withinEveryPreStateProcessor.updateState();
}
} finally {
lock.unlock();
stateHolder.returnState(state);
Expand All @@ -178,6 +174,7 @@ public void setPartnerStatePreProcessor(LogicalPreStateProcessor partnerStatePre
public void moveAllNewAndEveryStateEventListEventsToPendingStateEventList() {
StreamPreState state = stateHolder.getState();
try {
state.getNewAndEveryStateEventList().sort(eventTimeComparator);
state.getPendingStateEventList().addAll(state.getNewAndEveryStateEventList());
state.getNewAndEveryStateEventList().clear();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public interface PreStateProcessor extends Processor {

void updateState();

void expireEvents(long timestamp);

StreamPostStateProcessor getThisStatePostProcessor();

void resetState();
Expand Down
Loading

0 comments on commit b738a15

Please sign in to comment.