diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java index c7e20b2c34..814d9a1807 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java @@ -24,12 +24,15 @@ import io.siddhi.core.stream.output.sink.SinkHandlerManager; import io.siddhi.core.table.record.RecordTableHandlerManager; import io.siddhi.core.util.SiddhiAppRuntimeBuilder; +import io.siddhi.core.util.SiddhiConstants; import io.siddhi.core.util.config.ConfigManager; import io.siddhi.core.util.parser.SiddhiAppParser; import io.siddhi.core.util.persistence.InMemoryPersistenceStore; import io.siddhi.core.util.persistence.IncrementalPersistenceStore; import io.siddhi.core.util.persistence.PersistenceStore; import io.siddhi.query.api.SiddhiApp; +import io.siddhi.query.api.definition.StreamDefinition; +import io.siddhi.query.api.definition.TableDefinition; import io.siddhi.query.compiler.SiddhiCompiler; import org.apache.log4j.Logger; @@ -92,6 +95,42 @@ public SiddhiAppRuntime createSiddhiAppRuntime(String siddhiApp) { return createSiddhiAppRuntime(SiddhiCompiler.parse(updatedSiddhiApp), updatedSiddhiApp); } + /** + * Create a SiddhiApp Sandbox Runtime that runs without their Sources Sinks and Stores. + * + * @param siddhiApp SiddhiApp + * @return SiddhiAppRuntime without its Sources Sinks and Stores. + */ + public SiddhiAppRuntime createSandboxSiddhiAppRuntime(String siddhiApp) { + String updatedSiddhiApp = SiddhiCompiler.updateVariables(siddhiApp); + return createSiddhiAppRuntime(removeSourceSinkAndStoreAnnotations( + SiddhiCompiler.parse(updatedSiddhiApp)), updatedSiddhiApp); + } + + /** + * Create a SiddhiApp Sandbox Runtime that runs without their Sources Sinks and Stores. + * + * @param siddhiApp SiddhiApp + * @return SiddhiAppRuntime without its Sources Sinks and Stores. + */ + public SiddhiAppRuntime createSandboxSiddhiAppRuntime(SiddhiApp siddhiApp) { + return createSiddhiAppRuntime(removeSourceSinkAndStoreAnnotations(siddhiApp), null); + } + + private SiddhiApp removeSourceSinkAndStoreAnnotations(SiddhiApp siddhiApp) { + for (StreamDefinition streamDefinition : siddhiApp.getStreamDefinitionMap().values()) { + streamDefinition.getAnnotations().removeIf(annotation -> ( + annotation.getName().equalsIgnoreCase(SiddhiConstants.ANNOTATION_SOURCE) || + annotation.getName().equalsIgnoreCase(SiddhiConstants.ANNOTATION_SINK)) + && !annotation.getElement(SiddhiConstants.ANNOTATION_ELEMENT_TYPE).equalsIgnoreCase("inMemory")); + } + for (TableDefinition tableDefinition : siddhiApp.getTableDefinitionMap().values()) { + tableDefinition.getAnnotations().removeIf(annotation -> + annotation.getName().equalsIgnoreCase(SiddhiConstants.ANNOTATION_STORE)); + } + return siddhiApp; + } + /** * Method to retrieve already submitted siddhi app by providing the name. * diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java new file mode 100644 index 0000000000..1e2d4aca56 --- /dev/null +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java @@ -0,0 +1,225 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.core.managment; + +import io.siddhi.core.SiddhiAppRuntime; +import io.siddhi.core.SiddhiManager; +import io.siddhi.core.event.Event; +import io.siddhi.core.stream.input.InputHandler; +import io.siddhi.core.stream.input.source.Source; +import io.siddhi.core.stream.output.StreamCallback; +import io.siddhi.core.stream.output.sink.Sink; +import io.siddhi.core.table.InMemoryTable; +import io.siddhi.core.table.Table; +import io.siddhi.core.util.EventPrinter; +import io.siddhi.query.api.SiddhiApp; +import io.siddhi.query.compiler.SiddhiCompiler; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class SandboxTestCase { + private static final Logger log = Logger.getLogger(SandboxTestCase.class); + private AtomicInteger inEventCount; + private AtomicInteger removeEventCount; + private boolean eventArrived; + private AtomicInteger count; + + @BeforeMethod + public void init() { + count = new AtomicInteger(0); + inEventCount = new AtomicInteger(0); + removeEventCount = new AtomicInteger(0); + eventArrived = false; + } + + @Test + public void sandboxTest1() throws InterruptedException { + log.info("sandbox test1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String app = "" + + "@source(type='foo')" + + "@source(type='foo1')" + + "@sink(type='foo1')" + + "@source(type='inMemory', topic='myTopic')" + + "define stream StockStream (symbol string, price float, vol long);\n" + + "" + + "@sink(type='foo1')" + + "@sink(type='inMemory', topic='myTopic1')" + + "define stream DeleteStockStream (symbol string, price float, vol long);\n" + + "" + + "@store(type='rdbms')" + + "define table StockTable (symbol string, price float, volume long);\n" + + "" + + "define stream CountStockStream (symbol string);\n" + + "" + + "@info(name = 'query1') " + + "from StockStream " + + "select symbol, price, vol as volume " + + "insert into StockTable ;" + + "" + + "@info(name = 'query2') " + + "from DeleteStockStream[vol>=100] " + + "delete StockTable " + + " on StockTable.symbol==symbol ;" + + "" + + "@info(name = 'query3') " + + "from CountStockStream#window.length(0) join StockTable" + + " on CountStockStream.symbol==StockTable.symbol " + + "select CountStockStream.symbol as symbol " + + "insert into CountResultsStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSandboxSiddhiAppRuntime(app); + + Assert.assertEquals(siddhiAppRuntime.getSources().size(), 1); + Assert.assertEquals(siddhiAppRuntime.getSinks().size(), 1); + Assert.assertEquals(siddhiAppRuntime.getTables().size(), 1); + + for (List sources : siddhiAppRuntime.getSources()) { + for (Source source : sources) { + Assert.assertTrue(source.getType().equalsIgnoreCase("inMemory")); + } + } + + for (List sinks : siddhiAppRuntime.getSinks()) { + for (Sink sink : sinks) { + Assert.assertTrue(sink.getType().equalsIgnoreCase("inMemory")); + } + } + + for (Table table : siddhiAppRuntime.getTables()) { + Assert.assertTrue(table instanceof InMemoryTable); + } + + InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream"); + InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream"); + InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream"); + + siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + count.addAndGet(events.length); + } + }); + siddhiAppRuntime.start(); + + stockStream.send(new Object[]{"WSO2", 55.6f, 100L}); + stockStream.send(new Object[]{"IBM", 75.6f, 100L}); + stockStream.send(new Object[]{"WSO2", 57.6f, 100L}); + deleteStockStream.send(new Object[]{"IBM", 57.6f, 100L}); + countStockStream.send(new Object[]{"WSO2"}); + + Thread.sleep(500); + Assert.assertEquals(count.get(), 2); + siddhiAppRuntime.shutdown(); + + } + + @Test + public void sandboxTest2() throws InterruptedException { + log.info("sandbox test2"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String app = "" + + "@source(type='foo')" + + "@source(type='foo1')" + + "@sink(type='foo1')" + + "@source(type='inMemory', topic='myTopic')" + + "define stream StockStream (symbol string, price float, vol long);\n" + + "" + + "@sink(type='foo1')" + + "@sink(type='inMemory', topic='myTopic1')" + + "define stream DeleteStockStream (symbol string, price float, vol long);\n" + + "" + + "@store(type='rdbms')" + + "define table StockTable (symbol string, price float, volume long);\n" + + "" + + "define stream CountStockStream (symbol string);\n" + + "" + + "@info(name = 'query1') " + + "from StockStream " + + "select symbol, price, vol as volume " + + "insert into StockTable ;" + + "" + + "@info(name = 'query2') " + + "from DeleteStockStream[vol>=100] " + + "delete StockTable " + + " on StockTable.symbol==symbol ;" + + "" + + "@info(name = 'query3') " + + "from CountStockStream#window.length(0) join StockTable" + + " on CountStockStream.symbol==StockTable.symbol " + + "select CountStockStream.symbol as symbol " + + "insert into CountResultsStream ;"; + + SiddhiApp siddhiApp = SiddhiCompiler.parse(app); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSandboxSiddhiAppRuntime(siddhiApp); + + Assert.assertEquals(siddhiAppRuntime.getSources().size(), 1); + Assert.assertEquals(siddhiAppRuntime.getSinks().size(), 1); + Assert.assertEquals(siddhiAppRuntime.getTables().size(), 1); + + for (List sources : siddhiAppRuntime.getSources()) { + for (Source source : sources) { + Assert.assertTrue(source.getType().equalsIgnoreCase("inMemory")); + } + } + + for (List sinks : siddhiAppRuntime.getSinks()) { + for (Sink sink : sinks) { + Assert.assertTrue(sink.getType().equalsIgnoreCase("inMemory")); + } + } + + for (Table table : siddhiAppRuntime.getTables()) { + Assert.assertTrue(table instanceof InMemoryTable); + } + + InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream"); + InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream"); + InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream"); + + siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + count.addAndGet(events.length); + } + }); + siddhiAppRuntime.start(); + + stockStream.send(new Object[]{"WSO2", 55.6f, 100L}); + stockStream.send(new Object[]{"IBM", 75.6f, 100L}); + stockStream.send(new Object[]{"WSO2", 57.6f, 100L}); + deleteStockStream.send(new Object[]{"IBM", 57.6f, 100L}); + countStockStream.send(new Object[]{"WSO2"}); + + Thread.sleep(500); + Assert.assertEquals(count.get(), 2); + siddhiAppRuntime.shutdown(); + + } +} diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java index b138aa4368..f1fed93928 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java @@ -312,7 +312,6 @@ public void deleteFromTableTest5() throws InterruptedException { InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream"); InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream"); - siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() { @Override public void receive(Event[] events) { diff --git a/modules/siddhi-core/src/test/resources/testng.xml b/modules/siddhi-core/src/test/resources/testng.xml index 0a2d660e2a..18b63baf4f 100644 --- a/modules/siddhi-core/src/test/resources/testng.xml +++ b/modules/siddhi-core/src/test/resources/testng.xml @@ -38,6 +38,7 @@ +