Skip to content

Commit

Permalink
Merge pull request #1599 from suhothayan/master
Browse files Browse the repository at this point in the history
Add expression and expressionBatch window
  • Loading branch information
mohanvive authored Dec 25, 2019
2 parents eb3d43d + dd4f1a2 commit b2082be
Show file tree
Hide file tree
Showing 25 changed files with 3,336 additions and 60 deletions.
8 changes: 8 additions & 0 deletions findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
<Class name="io.siddhi.core.event.stream.MetaStreamEvent"/>
<Bug pattern="CN_IMPLEMENTS_CLONE_BUT_NOT_CLONEABLE"/>
</Match>
<Match>
<Class name="io.siddhi.core.query.processor.stream.window.ExpressionWindowProcessor$MetaStreamEventWrapper"/>
<Bug pattern="CN_IMPLEMENTS_CLONE_BUT_NOT_CLONEABLE"/>
</Match>
<Match>
<Class name="io.siddhi.core.query.processor.stream.window.ExpressionBatchWindowProcessor$MetaStreamEventWrapper"/>
<Bug pattern="CN_IMPLEMENTS_CLONE_BUT_NOT_CLONEABLE"/>
</Match>
<Match>
<Class name="io.siddhi.query.api.definition.StreamDefinition"/>
<Bug pattern="CN_IMPLEMENTS_CLONE_BUT_NOT_CLONEABLE"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private static void initMetaStreamEvent(MetaStreamEvent metaStreamEvent, Abstrac
String inputReferenceId) {
metaStreamEvent.addInputDefinition(inputDefinition);
metaStreamEvent.setInputReferenceId(inputReferenceId);
metaStreamEvent.initializeAfterWindowData();
metaStreamEvent.initializeOnAfterWindowData();
inputDefinition.getAttributeList().forEach(metaStreamEvent::addData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public List<Attribute> getOutputData() {
}
}

public void initializeOnAfterWindowData() {
if (onAfterWindowData == null) {
onAfterWindowData = new ArrayList<Attribute>();
}
}

@Deprecated
public void initializeAfterWindowData() {
if (onAfterWindowData == null) {
onAfterWindowData = new ArrayList<Attribute>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public StreamEventCloner(MetaStreamEvent metaStreamEvent, StreamEventFactory eve
*/
public StreamEvent copyStreamEvent(StreamEvent streamEvent) {
StreamEvent newEvent = eventFactory.newInstance();
if (beforeWindowDataSize > 0) {
if (streamEvent.getBeforeWindowData() == null) {
newEvent.setBeforeWindowData(null);
} else if (beforeWindowDataSize > 0) {
System.arraycopy(streamEvent.getBeforeWindowData(), 0, newEvent.getBeforeWindowData(), 0,
beforeWindowDataSize);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.siddhi.core.executor;

import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.executor.function.FunctionExecutor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.Attribute;

import static io.siddhi.core.util.SiddhiConstants.STREAM_EVENT_CHAIN_INDEX;
import static io.siddhi.core.util.SiddhiConstants.STREAM_EVENT_INDEX_IN_CHAIN;
import static io.siddhi.core.util.SiddhiConstants.UNKNOWN_STATE;

/**
* Executor class for Siddhi that extracts events.
*/
public class EventVariableFunctionExecutor extends FunctionExecutor {

private int[] position = new int[]{UNKNOWN_STATE, UNKNOWN_STATE};

public EventVariableFunctionExecutor(int streamEventChainIndex, int streamEventIndexInChain) {
position[STREAM_EVENT_CHAIN_INDEX] = streamEventChainIndex;
position[STREAM_EVENT_INDEX_IN_CHAIN] = streamEventIndexInChain;
}

@Override
protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
SiddhiQueryContext siddhiQueryContext) {
return null;
}

@Override
public Attribute.Type getReturnType() {
return Attribute.Type.OBJECT;
}

@Override
protected Object execute(Object[] data, State state) {
//will not occur
return null;
}

@Override
protected Object execute(Object data, State state) {
//will not occur
return null;
}

public Object execute(ComplexEvent event) {
return ((StateEvent) event).getStreamEvent(position);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
Expand All @@ -30,7 +31,6 @@
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;

/**
* Executor class for Siddhi cast function. Converts the given parameter according to the castTo parameter.
Expand All @@ -39,38 +39,55 @@
@Extension(
name = "eventTimestamp",
namespace = "",
description = "Returns the timestamp of the processed event.",
parameters = {},
description = "Returns the timestamp of the processed/passed event.",
parameters = {
@Parameter(name = "event",
description = "Event reference.",
type = {DataType.OBJECT},
dynamic = true,
optional = true,
defaultValue = "Current Event")
},
parameterOverloads = {
@ParameterOverload()
@ParameterOverload(),
@ParameterOverload(parameterNames = {"event"})
},
returnAttributes = @ReturnAttribute(
description = "timestamp of the event.",
description = "Timestamp of the event.",
type = DataType.LONG),
examples = {
@Example(
syntax = "from fooStream\n" +
syntax = "from FooStream\n" +
"select symbol as name, eventTimestamp() as eventTimestamp \n" +
"insert into barStream;",
description = "This will extract current events timestamp.")
"insert into BarStream;",
description = "Extracts current event's timestamp."),
@Example(
syntax = "from FooStream as f join FooBarTable as fb\n" +
"select fb.symbol as name, eventTimestamp(f) as eventTimestamp \n" +
"insert into BarStream;",
description = "Extracts FooStream event's timestamp.")
}
)
public class EventTimestampFunctionExecutor extends FunctionExecutor {

private boolean expectEventObject;

@Override
protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
SiddhiQueryContext siddhiQueryContext) {
if (attributeExpressionExecutors.length != 0) {
throw new SiddhiAppValidationException("Invalid no of arguments passed to eventTimestamp() function, " +
"required 0 parameters, but found " +
attributeExpressionExecutors.length);
if (attributeExpressionExecutors.length == 1) {
expectEventObject = true;
}
return null;
}

@Override
public Object execute(ComplexEvent event) {
return event.getTimestamp();
if (expectEventObject) {
return ((ComplexEvent) attributeExpressionExecutors[0].execute(event)).getTimestamp();
} else {
return event.getTimestamp();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ protected void processEventChunk(ComplexEventChunk<StreamEvent> streamEventChunk
StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater,
S state) {
streamEventChunk.reset();
while (streamEventChunk.hasNext()) {
StreamEvent streamEvent = streamEventChunk.next();
streamEvent.setBeforeWindowData(null);
}
streamEventChunk.reset();
process(streamEventChunk, nextProcessor, streamEventCloner, state);
}

Expand Down
Loading

0 comments on commit b2082be

Please sign in to comment.