From a90a7302b90a3bd54ea00790ad2f82dae568e70f Mon Sep 17 00:00:00 2001 From: Suhothayan Date: Sat, 17 Aug 2019 19:07:30 +0530 Subject: [PATCH 1/4] Add `createSiddhiAppSandboxRuntime()` to create a SiddhiApp without sources, sinks, and stores. Fixes #1438 --- .../java/io/siddhi/core/SiddhiManager.java | 39 +++ .../core/managment/SandboxTestCase.java | 225 ++++++++++++++++++ .../query/table/DeleteFromTableTestCase.java | 1 - 3 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java index c7e20b2c34..2da9538d0c 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java @@ -24,12 +24,15 @@ import io.siddhi.core.stream.output.sink.SinkHandlerManager; import io.siddhi.core.table.record.RecordTableHandlerManager; import io.siddhi.core.util.SiddhiAppRuntimeBuilder; +import io.siddhi.core.util.SiddhiConstants; import io.siddhi.core.util.config.ConfigManager; import io.siddhi.core.util.parser.SiddhiAppParser; import io.siddhi.core.util.persistence.InMemoryPersistenceStore; import io.siddhi.core.util.persistence.IncrementalPersistenceStore; import io.siddhi.core.util.persistence.PersistenceStore; import io.siddhi.query.api.SiddhiApp; +import io.siddhi.query.api.definition.StreamDefinition; +import io.siddhi.query.api.definition.TableDefinition; import io.siddhi.query.compiler.SiddhiCompiler; import org.apache.log4j.Logger; @@ -92,6 +95,42 @@ public SiddhiAppRuntime createSiddhiAppRuntime(String siddhiApp) { return createSiddhiAppRuntime(SiddhiCompiler.parse(updatedSiddhiApp), updatedSiddhiApp); } + /** + * Create a SiddhiApp Sandbox Runtime that runs without their Sources Sinks and Stores. + * + * @param siddhiApp SiddhiApp + * @return SiddhiAppRuntime without its Sources Sinks and Stores. + */ + public SiddhiAppRuntime createSiddhiAppSandboxRuntime(String siddhiApp) { + String updatedSiddhiApp = SiddhiCompiler.updateVariables(siddhiApp); + return createSiddhiAppRuntime(removeSourceSinkAndStoreAnnotations( + SiddhiCompiler.parse(updatedSiddhiApp)), updatedSiddhiApp); + } + + /** + * Create a SiddhiApp Sandbox Runtime that runs without their Sources Sinks and Stores. + * + * @param siddhiApp SiddhiApp + * @return SiddhiAppRuntime without its Sources Sinks and Stores. + */ + public SiddhiAppRuntime createSiddhiAppSandboxRuntime(SiddhiApp siddhiApp) { + return createSiddhiAppRuntime(removeSourceSinkAndStoreAnnotations(siddhiApp), null); + } + + private SiddhiApp removeSourceSinkAndStoreAnnotations(SiddhiApp siddhiApp) { + for (StreamDefinition streamDefinition : siddhiApp.getStreamDefinitionMap().values()) { + streamDefinition.getAnnotations().removeIf(annotation -> ( + annotation.getName().equalsIgnoreCase(SiddhiConstants.ANNOTATION_SOURCE) || + annotation.getName().equalsIgnoreCase(SiddhiConstants.ANNOTATION_SINK)) + && !annotation.getElement(SiddhiConstants.ANNOTATION_ELEMENT_TYPE).equalsIgnoreCase("inMemory")); + } + for (TableDefinition tableDefinition : siddhiApp.getTableDefinitionMap().values()) { + tableDefinition.getAnnotations().removeIf(annotation -> + annotation.getName().equalsIgnoreCase(SiddhiConstants.ANNOTATION_STORE)); + } + return siddhiApp; + } + /** * Method to retrieve already submitted siddhi app by providing the name. * diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java new file mode 100644 index 0000000000..5c60d78817 --- /dev/null +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java @@ -0,0 +1,225 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.core.managment; + +import io.siddhi.core.SiddhiAppRuntime; +import io.siddhi.core.SiddhiManager; +import io.siddhi.core.event.Event; +import io.siddhi.core.stream.input.InputHandler; +import io.siddhi.core.stream.input.source.Source; +import io.siddhi.core.stream.output.StreamCallback; +import io.siddhi.core.stream.output.sink.Sink; +import io.siddhi.core.table.InMemoryTable; +import io.siddhi.core.table.Table; +import io.siddhi.core.util.EventPrinter; +import io.siddhi.query.api.SiddhiApp; +import io.siddhi.query.compiler.SiddhiCompiler; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class SandboxTestCase { + private static final Logger log = Logger.getLogger(SandboxTestCase.class); + private AtomicInteger inEventCount; + private AtomicInteger removeEventCount; + private boolean eventArrived; + private AtomicInteger count; + + @BeforeMethod + public void init() { + count = new AtomicInteger(0); + inEventCount = new AtomicInteger(0); + removeEventCount = new AtomicInteger(0); + eventArrived = false; + } + + @Test + public void sandboxTest1() throws InterruptedException { + log.info("sandbox test1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String app = "" + + "@source(type='foo')" + + "@source(type='foo1')" + + "@sink(type='foo1')" + + "@source(type='inMemory', topic='myTopic')" + + "define stream StockStream (symbol string, price float, vol long);\n" + + "" + + "@sink(type='foo1')" + + "@sink(type='inMemory', topic='myTopic1')" + + "define stream DeleteStockStream (symbol string, price float, vol long);\n" + + "" + + "@store(type='rdbms')" + + "define table StockTable (symbol string, price float, volume long);\n" + + "" + + "define stream CountStockStream (symbol string);\n" + + "" + + "@info(name = 'query1') " + + "from StockStream " + + "select symbol, price, vol as volume " + + "insert into StockTable ;" + + "" + + "@info(name = 'query2') " + + "from DeleteStockStream[vol>=100] " + + "delete StockTable " + + " on StockTable.symbol==symbol ;" + + "" + + "@info(name = 'query3') " + + "from CountStockStream#window.length(0) join StockTable" + + " on CountStockStream.symbol==StockTable.symbol " + + "select CountStockStream.symbol as symbol " + + "insert into CountResultsStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppSandboxRuntime(app); + + Assert.assertEquals(siddhiAppRuntime.getSources().size(), 1); + Assert.assertEquals(siddhiAppRuntime.getSinks().size(), 1); + Assert.assertEquals(siddhiAppRuntime.getTables().size(), 1); + + for (List sources : siddhiAppRuntime.getSources()) { + for (Source source : sources) { + Assert.assertTrue(source.getType().equalsIgnoreCase("inMemory")); + } + } + + for (List sinks : siddhiAppRuntime.getSinks()) { + for (Sink sink : sinks) { + Assert.assertTrue(sink.getType().equalsIgnoreCase("inMemory")); + } + } + + for (Table table : siddhiAppRuntime.getTables()) { + Assert.assertTrue(table instanceof InMemoryTable); + } + + InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream"); + InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream"); + InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream"); + + siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + count.addAndGet(events.length); + } + }); + siddhiAppRuntime.start(); + + stockStream.send(new Object[]{"WSO2", 55.6f, 100L}); + stockStream.send(new Object[]{"IBM", 75.6f, 100L}); + stockStream.send(new Object[]{"WSO2", 57.6f, 100L}); + deleteStockStream.send(new Object[]{"IBM", 57.6f, 100L}); + countStockStream.send(new Object[]{"WSO2"}); + + Thread.sleep(500); + Assert.assertEquals(count.get(), 2); + siddhiAppRuntime.shutdown(); + + } + + @Test + public void sandboxTest2() throws InterruptedException { + log.info("sandbox test2"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String app = "" + + "@source(type='foo')" + + "@source(type='foo1')" + + "@sink(type='foo1')" + + "@source(type='inMemory', topic='myTopic')" + + "define stream StockStream (symbol string, price float, vol long);\n" + + "" + + "@sink(type='foo1')" + + "@sink(type='inMemory', topic='myTopic1')" + + "define stream DeleteStockStream (symbol string, price float, vol long);\n" + + "" + + "@store(type='rdbms')" + + "define table StockTable (symbol string, price float, volume long);\n" + + "" + + "define stream CountStockStream (symbol string);\n" + + "" + + "@info(name = 'query1') " + + "from StockStream " + + "select symbol, price, vol as volume " + + "insert into StockTable ;" + + "" + + "@info(name = 'query2') " + + "from DeleteStockStream[vol>=100] " + + "delete StockTable " + + " on StockTable.symbol==symbol ;" + + "" + + "@info(name = 'query3') " + + "from CountStockStream#window.length(0) join StockTable" + + " on CountStockStream.symbol==StockTable.symbol " + + "select CountStockStream.symbol as symbol " + + "insert into CountResultsStream ;"; + + SiddhiApp siddhiApp = SiddhiCompiler.parse(app); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppSandboxRuntime(siddhiApp); + + Assert.assertEquals(siddhiAppRuntime.getSources().size(), 1); + Assert.assertEquals(siddhiAppRuntime.getSinks().size(), 1); + Assert.assertEquals(siddhiAppRuntime.getTables().size(), 1); + + for (List sources : siddhiAppRuntime.getSources()) { + for (Source source : sources) { + Assert.assertTrue(source.getType().equalsIgnoreCase("inMemory")); + } + } + + for (List sinks : siddhiAppRuntime.getSinks()) { + for (Sink sink : sinks) { + Assert.assertTrue(sink.getType().equalsIgnoreCase("inMemory")); + } + } + + for (Table table : siddhiAppRuntime.getTables()) { + Assert.assertTrue(table instanceof InMemoryTable); + } + + InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream"); + InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream"); + InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream"); + + siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + count.addAndGet(events.length); + } + }); + siddhiAppRuntime.start(); + + stockStream.send(new Object[]{"WSO2", 55.6f, 100L}); + stockStream.send(new Object[]{"IBM", 75.6f, 100L}); + stockStream.send(new Object[]{"WSO2", 57.6f, 100L}); + deleteStockStream.send(new Object[]{"IBM", 57.6f, 100L}); + countStockStream.send(new Object[]{"WSO2"}); + + Thread.sleep(500); + Assert.assertEquals(count.get(), 2); + siddhiAppRuntime.shutdown(); + + } +} diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java index b138aa4368..f1fed93928 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java @@ -312,7 +312,6 @@ public void deleteFromTableTest5() throws InterruptedException { InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream"); InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream"); - siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() { @Override public void receive(Event[] events) { From 8f2274ec38007b7a72d01b42780941bf5fc85933 Mon Sep 17 00:00:00 2001 From: Suhothayan Date: Sat, 17 Aug 2019 19:18:58 +0530 Subject: [PATCH 2/4] Update the method name to `createSandboxSiddhiAppRuntime()` Fixes #1438 --- .../src/main/java/io/siddhi/core/SiddhiManager.java | 4 ++-- .../test/java/io/siddhi/core/managment/SandboxTestCase.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java index 2da9538d0c..814d9a1807 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/SiddhiManager.java @@ -101,7 +101,7 @@ public SiddhiAppRuntime createSiddhiAppRuntime(String siddhiApp) { * @param siddhiApp SiddhiApp * @return SiddhiAppRuntime without its Sources Sinks and Stores. */ - public SiddhiAppRuntime createSiddhiAppSandboxRuntime(String siddhiApp) { + public SiddhiAppRuntime createSandboxSiddhiAppRuntime(String siddhiApp) { String updatedSiddhiApp = SiddhiCompiler.updateVariables(siddhiApp); return createSiddhiAppRuntime(removeSourceSinkAndStoreAnnotations( SiddhiCompiler.parse(updatedSiddhiApp)), updatedSiddhiApp); @@ -113,7 +113,7 @@ public SiddhiAppRuntime createSiddhiAppSandboxRuntime(String siddhiApp) { * @param siddhiApp SiddhiApp * @return SiddhiAppRuntime without its Sources Sinks and Stores. */ - public SiddhiAppRuntime createSiddhiAppSandboxRuntime(SiddhiApp siddhiApp) { + public SiddhiAppRuntime createSandboxSiddhiAppRuntime(SiddhiApp siddhiApp) { return createSiddhiAppRuntime(removeSourceSinkAndStoreAnnotations(siddhiApp), null); } diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java index 5c60d78817..0aa948b097 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java @@ -90,7 +90,7 @@ public void sandboxTest1() throws InterruptedException { "select CountStockStream.symbol as symbol " + "insert into CountResultsStream ;"; - SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppSandboxRuntime(app); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSandboxSiddhiAppRuntime(app); Assert.assertEquals(siddhiAppRuntime.getSources().size(), 1); Assert.assertEquals(siddhiAppRuntime.getSinks().size(), 1); @@ -176,7 +176,7 @@ public void sandboxTest2() throws InterruptedException { "insert into CountResultsStream ;"; SiddhiApp siddhiApp = SiddhiCompiler.parse(app); - SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppSandboxRuntime(siddhiApp); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSandboxSiddhiAppRuntime(siddhiApp); Assert.assertEquals(siddhiAppRuntime.getSources().size(), 1); Assert.assertEquals(siddhiAppRuntime.getSinks().size(), 1); From ea09ae10544ee257c2a81b4e114f313260cc01aa Mon Sep 17 00:00:00 2001 From: Suhothayan Date: Sat, 17 Aug 2019 22:02:00 +0530 Subject: [PATCH 3/4] Add test case to the build --- modules/siddhi-core/src/test/resources/testng.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/siddhi-core/src/test/resources/testng.xml b/modules/siddhi-core/src/test/resources/testng.xml index e7be813521..13c9e715d9 100644 --- a/modules/siddhi-core/src/test/resources/testng.xml +++ b/modules/siddhi-core/src/test/resources/testng.xml @@ -38,6 +38,7 @@ + From d5c4cdca65503751d7359289a37e5dc60046a89f Mon Sep 17 00:00:00 2001 From: Chiran Fernando Date: Tue, 20 Aug 2019 12:03:59 +0530 Subject: [PATCH 4/4] Update license year --- .../src/test/java/io/siddhi/core/managment/SandboxTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java index 0aa948b097..1e2d4aca56 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/SandboxTestCase.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except