Skip to content

Commit

Permalink
Merge pull request #1451 from suhothayan/master
Browse files Browse the repository at this point in the history
Add ability to create a Sandbox SiddhiAppRuntime for testing purposes.
  • Loading branch information
pcnfernando authored Aug 20, 2019
2 parents 98ab6bf + d5c4cdc commit 3ae30f2
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import io.siddhi.core.stream.output.sink.SinkHandlerManager;
import io.siddhi.core.table.record.RecordTableHandlerManager;
import io.siddhi.core.util.SiddhiAppRuntimeBuilder;
import io.siddhi.core.util.SiddhiConstants;
import io.siddhi.core.util.config.ConfigManager;
import io.siddhi.core.util.parser.SiddhiAppParser;
import io.siddhi.core.util.persistence.InMemoryPersistenceStore;
import io.siddhi.core.util.persistence.IncrementalPersistenceStore;
import io.siddhi.core.util.persistence.PersistenceStore;
import io.siddhi.query.api.SiddhiApp;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.compiler.SiddhiCompiler;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -92,6 +95,42 @@ public SiddhiAppRuntime createSiddhiAppRuntime(String siddhiApp) {
return createSiddhiAppRuntime(SiddhiCompiler.parse(updatedSiddhiApp), updatedSiddhiApp);
}

/**
* Create a SiddhiApp Sandbox Runtime that runs without their Sources Sinks and Stores.
*
* @param siddhiApp SiddhiApp
* @return SiddhiAppRuntime without its Sources Sinks and Stores.
*/
public SiddhiAppRuntime createSandboxSiddhiAppRuntime(String siddhiApp) {
String updatedSiddhiApp = SiddhiCompiler.updateVariables(siddhiApp);
return createSiddhiAppRuntime(removeSourceSinkAndStoreAnnotations(
SiddhiCompiler.parse(updatedSiddhiApp)), updatedSiddhiApp);
}

/**
* Create a SiddhiApp Sandbox Runtime that runs without their Sources Sinks and Stores.
*
* @param siddhiApp SiddhiApp
* @return SiddhiAppRuntime without its Sources Sinks and Stores.
*/
public SiddhiAppRuntime createSandboxSiddhiAppRuntime(SiddhiApp siddhiApp) {
return createSiddhiAppRuntime(removeSourceSinkAndStoreAnnotations(siddhiApp), null);
}

private SiddhiApp removeSourceSinkAndStoreAnnotations(SiddhiApp siddhiApp) {
for (StreamDefinition streamDefinition : siddhiApp.getStreamDefinitionMap().values()) {
streamDefinition.getAnnotations().removeIf(annotation -> (
annotation.getName().equalsIgnoreCase(SiddhiConstants.ANNOTATION_SOURCE) ||
annotation.getName().equalsIgnoreCase(SiddhiConstants.ANNOTATION_SINK))
&& !annotation.getElement(SiddhiConstants.ANNOTATION_ELEMENT_TYPE).equalsIgnoreCase("inMemory"));
}
for (TableDefinition tableDefinition : siddhiApp.getTableDefinitionMap().values()) {
tableDefinition.getAnnotations().removeIf(annotation ->
annotation.getName().equalsIgnoreCase(SiddhiConstants.ANNOTATION_STORE));
}
return siddhiApp;
}

/**
* Method to retrieve already submitted siddhi app by providing the name.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.siddhi.core.managment;

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.table.InMemoryTable;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.EventPrinter;
import io.siddhi.query.api.SiddhiApp;
import io.siddhi.query.compiler.SiddhiCompiler;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class SandboxTestCase {
private static final Logger log = Logger.getLogger(SandboxTestCase.class);
private AtomicInteger inEventCount;
private AtomicInteger removeEventCount;
private boolean eventArrived;
private AtomicInteger count;

@BeforeMethod
public void init() {
count = new AtomicInteger(0);
inEventCount = new AtomicInteger(0);
removeEventCount = new AtomicInteger(0);
eventArrived = false;
}

@Test
public void sandboxTest1() throws InterruptedException {
log.info("sandbox test1");

SiddhiManager siddhiManager = new SiddhiManager();

String app = "" +
"@source(type='foo')" +
"@source(type='foo1')" +
"@sink(type='foo1')" +
"@source(type='inMemory', topic='myTopic')" +
"define stream StockStream (symbol string, price float, vol long);\n" +
"" +
"@sink(type='foo1')" +
"@sink(type='inMemory', topic='myTopic1')" +
"define stream DeleteStockStream (symbol string, price float, vol long);\n" +
"" +
"@store(type='rdbms')" +
"define table StockTable (symbol string, price float, volume long);\n" +
"" +
"define stream CountStockStream (symbol string);\n" +
"" +
"@info(name = 'query1') " +
"from StockStream " +
"select symbol, price, vol as volume " +
"insert into StockTable ;" +
"" +
"@info(name = 'query2') " +
"from DeleteStockStream[vol>=100] " +
"delete StockTable " +
" on StockTable.symbol==symbol ;" +
"" +
"@info(name = 'query3') " +
"from CountStockStream#window.length(0) join StockTable" +
" on CountStockStream.symbol==StockTable.symbol " +
"select CountStockStream.symbol as symbol " +
"insert into CountResultsStream ;";

SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSandboxSiddhiAppRuntime(app);

Assert.assertEquals(siddhiAppRuntime.getSources().size(), 1);
Assert.assertEquals(siddhiAppRuntime.getSinks().size(), 1);
Assert.assertEquals(siddhiAppRuntime.getTables().size(), 1);

for (List<Source> sources : siddhiAppRuntime.getSources()) {
for (Source source : sources) {
Assert.assertTrue(source.getType().equalsIgnoreCase("inMemory"));
}
}

for (List<Sink> sinks : siddhiAppRuntime.getSinks()) {
for (Sink sink : sinks) {
Assert.assertTrue(sink.getType().equalsIgnoreCase("inMemory"));
}
}

for (Table table : siddhiAppRuntime.getTables()) {
Assert.assertTrue(table instanceof InMemoryTable);
}

InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream");
InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream");

siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
count.addAndGet(events.length);
}
});
siddhiAppRuntime.start();

stockStream.send(new Object[]{"WSO2", 55.6f, 100L});
stockStream.send(new Object[]{"IBM", 75.6f, 100L});
stockStream.send(new Object[]{"WSO2", 57.6f, 100L});
deleteStockStream.send(new Object[]{"IBM", 57.6f, 100L});
countStockStream.send(new Object[]{"WSO2"});

Thread.sleep(500);
Assert.assertEquals(count.get(), 2);
siddhiAppRuntime.shutdown();

}

@Test
public void sandboxTest2() throws InterruptedException {
log.info("sandbox test2");

SiddhiManager siddhiManager = new SiddhiManager();

String app = "" +
"@source(type='foo')" +
"@source(type='foo1')" +
"@sink(type='foo1')" +
"@source(type='inMemory', topic='myTopic')" +
"define stream StockStream (symbol string, price float, vol long);\n" +
"" +
"@sink(type='foo1')" +
"@sink(type='inMemory', topic='myTopic1')" +
"define stream DeleteStockStream (symbol string, price float, vol long);\n" +
"" +
"@store(type='rdbms')" +
"define table StockTable (symbol string, price float, volume long);\n" +
"" +
"define stream CountStockStream (symbol string);\n" +
"" +
"@info(name = 'query1') " +
"from StockStream " +
"select symbol, price, vol as volume " +
"insert into StockTable ;" +
"" +
"@info(name = 'query2') " +
"from DeleteStockStream[vol>=100] " +
"delete StockTable " +
" on StockTable.symbol==symbol ;" +
"" +
"@info(name = 'query3') " +
"from CountStockStream#window.length(0) join StockTable" +
" on CountStockStream.symbol==StockTable.symbol " +
"select CountStockStream.symbol as symbol " +
"insert into CountResultsStream ;";

SiddhiApp siddhiApp = SiddhiCompiler.parse(app);
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSandboxSiddhiAppRuntime(siddhiApp);

Assert.assertEquals(siddhiAppRuntime.getSources().size(), 1);
Assert.assertEquals(siddhiAppRuntime.getSinks().size(), 1);
Assert.assertEquals(siddhiAppRuntime.getTables().size(), 1);

for (List<Source> sources : siddhiAppRuntime.getSources()) {
for (Source source : sources) {
Assert.assertTrue(source.getType().equalsIgnoreCase("inMemory"));
}
}

for (List<Sink> sinks : siddhiAppRuntime.getSinks()) {
for (Sink sink : sinks) {
Assert.assertTrue(sink.getType().equalsIgnoreCase("inMemory"));
}
}

for (Table table : siddhiAppRuntime.getTables()) {
Assert.assertTrue(table instanceof InMemoryTable);
}

InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream");
InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream");

siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
count.addAndGet(events.length);
}
});
siddhiAppRuntime.start();

stockStream.send(new Object[]{"WSO2", 55.6f, 100L});
stockStream.send(new Object[]{"IBM", 75.6f, 100L});
stockStream.send(new Object[]{"WSO2", 57.6f, 100L});
deleteStockStream.send(new Object[]{"IBM", 57.6f, 100L});
countStockStream.send(new Object[]{"WSO2"});

Thread.sleep(500);
Assert.assertEquals(count.get(), 2);
siddhiAppRuntime.shutdown();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ public void deleteFromTableTest5() throws InterruptedException {
InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream");
InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream");


siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
Expand Down
1 change: 1 addition & 0 deletions modules/siddhi-core/src/test/resources/testng.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<class name="io.siddhi.core.managment.PlaybackTestCase"/>
<class name="io.siddhi.core.managment.LogTestCase"/>
<class name="io.siddhi.core.managment.StateTestCase"/>
<class name="io.siddhi.core.managment.SandboxTestCase"/>
<class name="io.siddhi.core.managment.StatisticsTestCase"/>
<class name="io.siddhi.core.managment.PersistenceTestCase"/>
<class name="io.siddhi.core.managment.IncrementalPersistenceTestCase"/>
Expand Down

0 comments on commit 3ae30f2

Please sign in to comment.