From 2b178589596346317d558814704d76f09c4dc71f Mon Sep 17 00:00:00 2001 From: dilini-muthumala Date: Sat, 24 Apr 2021 00:29:16 +0530 Subject: [PATCH] Add nested Json testcase and Bump siddhi version as needed A fix went to siddhi v5.1.20-SNAPSHOT is needed for this testcase to pass --- .../sinkmapper/JsonSinkMapperTestCase.java | 50 +++++++++++++++++++ pom.xml | 2 +- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java b/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java index 23c1edf..811d35d 100644 --- a/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java +++ b/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java @@ -23,6 +23,8 @@ import io.siddhi.core.event.Event; import io.siddhi.core.exception.SiddhiAppCreationException; import io.siddhi.core.stream.input.InputHandler; +import io.siddhi.core.stream.output.StreamCallback; +import io.siddhi.core.util.EventPrinter; import io.siddhi.core.util.SiddhiTestHelper; import io.siddhi.core.util.transport.InMemoryBroker; import org.apache.log4j.Logger; @@ -40,11 +42,13 @@ public class JsonSinkMapperTestCase { private final int timeout = 30000; private AtomicInteger wso2Count = new AtomicInteger(0); private AtomicInteger ibmCount = new AtomicInteger(0); + private volatile int count; @BeforeMethod public void init() { wso2Count.set(0); ibmCount.set(0); + count = 0; } /* @@ -1320,5 +1324,51 @@ public String getTopic() { InMemoryBroker.unsubscribe(subscriberWSO2); } + @Test + public void jsonSinkMapperTestCaseForNestedJson() { + // see https://github.com/wso2/streaming-integrator/issues/228 + log.info("JsonSinkMapperTestCase for a nested Json"); + + String app = "" + + "@App:name(\"PreDefinedProducerApp\")\n" + + "define stream DataStream" + + "(PublisherName string,AlertCategory String,AlertType String,recipientType String);\n" + + "@sink(type='log',\n" + + "@map(type='json', enclosing.element='event', validate.json='true', " + + "@payload(\"\"\"{\"alertTypeData\":{\"PublisherName\":\"{{PublisherName}}\",\"AlertCategory\"" + + ":\"{{AlertCategory}}\",\"AlertType\":\"{{AlertType}}\"},\"recipientData\":{\"recipientType\":" + + "\"{{recipientType}}\"}}\"\"\")))\n" + + "define stream DataWriteStream (PublisherName string, AlertCategory string, " + + "AlertType string, recipientType string);\n" + + "from DataStream\n" + + "select PublisherName, AlertCategory, AlertType , recipientType\n" + + "insert into DataWriteStream; "; + + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + siddhiAppRuntime.addCallback("DataWriteStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + count += events.length; + } + }); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("DataStream"); + siddhiAppRuntime.start(); + try { + stockStream.send(new Object[]{"WSO2", "IBM", "FOO", "BAR"}); + } catch (InterruptedException e) { + AssertJUnit.fail("Failed to send the message. cause: " + e.getMessage()); + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + AssertJUnit.fail("Failed to run the testcase. cause: " + e.getMessage()); + } + siddhiAppRuntime.shutdown(); + AssertJUnit.assertEquals(1, count); + } } diff --git a/pom.xml b/pom.xml index 8bbf9d2..003999f 100644 --- a/pom.xml +++ b/pom.xml @@ -135,7 +135,7 @@ - 5.1.18 + 5.1.20 [5.0.0,6.0.0) 1.2.17.wso2v1 3.0.0