diff --git a/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java b/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java index 56bbd79..5025c38 100644 --- a/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java +++ b/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java @@ -23,6 +23,8 @@ import io.siddhi.core.event.Event; import io.siddhi.core.exception.SiddhiAppCreationException; import io.siddhi.core.stream.input.InputHandler; +import io.siddhi.core.stream.output.StreamCallback; +import io.siddhi.core.util.EventPrinter; import io.siddhi.core.util.SiddhiTestHelper; import io.siddhi.core.util.transport.InMemoryBroker; import org.apache.logging.log4j.LogManager; @@ -41,11 +43,13 @@ public class JsonSinkMapperTestCase { private final int timeout = 30000; private AtomicInteger wso2Count = new AtomicInteger(0); private AtomicInteger ibmCount = new AtomicInteger(0); + private volatile int count; @BeforeMethod public void init() { wso2Count.set(0); ibmCount.set(0); + count = 0; } /* @@ -1321,5 +1325,51 @@ public String getTopic() { InMemoryBroker.unsubscribe(subscriberWSO2); } + @Test + public void jsonSinkMapperTestCaseForNestedJson() { + // see https://github.com/wso2/streaming-integrator/issues/228 + log.info("JsonSinkMapperTestCase for a nested Json"); + + String app = "" + + "@App:name(\"PreDefinedProducerApp\")\n" + + "define stream DataStream" + + "(PublisherName string,AlertCategory String,AlertType String,recipientType String);\n" + + "@sink(type='log',\n" + + "@map(type='json', enclosing.element='event', validate.json='true', " + + "@payload(\"\"\"{\"alertTypeData\":{\"PublisherName\":\"{{PublisherName}}\",\"AlertCategory\"" + + ":\"{{AlertCategory}}\",\"AlertType\":\"{{AlertType}}\"},\"recipientData\":{\"recipientType\":" + + "\"{{recipientType}}\"}}\"\"\")))\n" + + "define stream DataWriteStream (PublisherName string, AlertCategory string, " + + "AlertType string, recipientType string);\n" + + "from DataStream\n" + + "select PublisherName, AlertCategory, AlertType , recipientType\n" + + "insert into DataWriteStream; "; + + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + siddhiAppRuntime.addCallback("DataWriteStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + count += events.length; + } + }); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("DataStream"); + siddhiAppRuntime.start(); + try { + stockStream.send(new Object[]{"WSO2", "IBM", "FOO", "BAR"}); + } catch (InterruptedException e) { + AssertJUnit.fail("Failed to send the message. cause: " + e.getMessage()); + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + AssertJUnit.fail("Failed to run the testcase. cause: " + e.getMessage()); + } + siddhiAppRuntime.shutdown(); + AssertJUnit.assertEquals(1, count); + } }