diff --git a/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java b/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java index b7ec0c5..23c1edf 100644 --- a/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java +++ b/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java @@ -1188,4 +1188,137 @@ public String getTopic() { //unsubscribe from "inMemory" broker per topic InMemoryBroker.unsubscribe(subscriberWSO2); } + + @Test + public void jsonSinkMapperTestCaseForPreserveNullValue() throws InterruptedException { + log.info("jsonSinkMapperTestCaseForPreserveNullValue"); + InMemoryBroker.Subscriber subscriberWSO2 = new InMemoryBroker.Subscriber() { + @Override + public void onMessage(Object msg) { + String jsonString; + switch (wso2Count.incrementAndGet()) { + case 1: + jsonString = "{\"event\":{\"symbol\":\"WSO2\",\"price\":55.6,\"volume\":null}}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + case 2: + jsonString = "{\"event\":{\"symbol\":\"WSO2\",\"price\":null,\"volume\":100}}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + case 3: + jsonString = "{\"event\":{\"symbol\":null,\"price\":55.6,\"volume\":100}}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + default: + AssertJUnit.fail(); + } + } + @Override + public String getTopic() { + return "WSO2"; + } + }; + //subscribe to "inMemory" broker per topic + InMemoryBroker.subscribe(subscriberWSO2); + String streams = "" + + "@App:name('TestSiddhiApp')" + + "define stream FooStream (symbol string, price float, volume long); " + + "@sink(type='inMemory', topic='WSO2', @map(type='json', enable.null.attribute.value='true')) " + + "define stream BarStream (symbol string, price float, volume long); "; + String query = "" + + "from FooStream " + + "select * " + + "insert into BarStream; "; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("FooStream"); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{"WSO2", 55.6f, null}); + stockStream.send(new Object[]{"WSO2", null, 100L}); + stockStream.send(new Object[]{null, 55.6f, 100L}); + SiddhiTestHelper.waitForEvents(waitTime, 3, wso2Count, timeout); + //assert event count + AssertJUnit.assertEquals(3, wso2Count.get()); + siddhiAppRuntime.shutdown(); + //unsubscribe from "inMemory" broker per topic + InMemoryBroker.unsubscribe(subscriberWSO2); + } + + @Test + public void jsonSinkCustomMapperTestCaseForPreserveNullValue() throws InterruptedException { + log.info("jsonSinkCustomMapperTestCaseForPreserveNullValue"); + InMemoryBroker.Subscriber subscriberWSO2 = new InMemoryBroker.Subscriber() { + @Override + public void onMessage(Object msg) { + String jsonString; + switch (wso2Count.incrementAndGet()) { + case 1: + jsonString = "{\n" + + " \"Stock Data\":{\n" + + " \"Symbol\":\"WSO2\",\n" + + " \"Price\":55.6,\n" + + " \"Volume\":null\n" + + " }\n" + + "}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + case 2: + jsonString = "{\n" + + " \"Stock Data\":{\n" + + " \"Symbol\":\"WSO2\",\n" + + " \"Price\":null,\n" + + " \"Volume\":100\n" + + " }\n" + + "}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + case 3: + jsonString = "{\n" + + " \"Stock Data\":{\n" + + " \"Symbol\":\"null\",\n" + + " \"Price\":55.6,\n" + + " \"Volume\":100\n" + + " }\n" + + "}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + default: + AssertJUnit.fail(); + } + } + @Override + public String getTopic() { + return "WSO2"; + } + }; + //subscribe to "inMemory" broker per topic + InMemoryBroker.subscribe(subscriberWSO2); + String streams = "" + + "@App:name('TestSiddhiApp')" + + "define stream FooStream (symbol string, price float, volume long); " + + "@sink(type='inMemory', topic='WSO2', @map(type='json', enable.null.attribute.value='true'," + + "@payload(\"\"\"{\n" + " \"Stock Data\":{\n" + + " \"Symbol\":\"{{symbol}}\",\n" + " \"Price\":{{price}},\n" + + " \"Volume\":{{volume}}\n" + " }\n" + "}\"\"\"))) " + + "define stream BarStream (symbol string, price float, volume long); "; + String query = "" + + "from FooStream " + + "select * " + + "insert into BarStream; "; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("FooStream"); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{"WSO2", 55.6f, null}); + stockStream.send(new Object[]{"WSO2", null, 100L}); + stockStream.send(new Object[]{null, 55.6f, 100L}); + SiddhiTestHelper.waitForEvents(waitTime, 3, wso2Count, timeout); + //assert event count + AssertJUnit.assertEquals(3, wso2Count.get()); + siddhiAppRuntime.shutdown(); + //unsubscribe from "inMemory" broker per topic + InMemoryBroker.unsubscribe(subscriberWSO2); + } + + }