diff --git a/component/src/main/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapper.java b/component/src/main/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapper.java index 769f879..6008b24 100644 --- a/component/src/main/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapper.java +++ b/component/src/main/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapper.java @@ -18,6 +18,7 @@ package io.siddhi.extension.map.json.sinkmapper; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.siddhi.annotation.Example; @@ -79,7 +80,13 @@ "'true' or the value can be set to 'false' to separate events", type = {DataType.BOOL}, optional = true, - defaultValue = "true") + defaultValue = "true"), + @Parameter(name = "enable.null.attribute.value", + description = "If this parameter is true, output parameter values will contain null values " + + "if not they will be undefined", + type = {DataType.BOOL}, + optional = true, + defaultValue = "false") }, examples = { @Example( @@ -119,6 +126,7 @@ public class JsonSinkMapper extends SinkMapper { private static final String DEFAULT_ENCLOSING_ELEMENT = "$"; private static final String JSON_VALIDATION_IDENTIFIER = "validate.json"; private static final String EVENT_GROUPING_ENABLED = "event.grouping.enabled"; + private static final String ENABLE_NULL_ATTRIBUTE_VALUE = "enable.null.attribute.value"; private static final String JSON_EVENT_SEPERATOR = ","; private static final String JSON_KEYVALUE_SEPERATOR = ":"; private static final String JSON_ARRAY_START_SYMBOL = "["; @@ -131,6 +139,7 @@ public class JsonSinkMapper extends SinkMapper { private String enclosingElement = null; private boolean isJsonValidationEnabled = false; private boolean eventGroupingEnabled = true; + private boolean enableNullAttributeValue = false; @Override @@ -162,6 +171,8 @@ public void init(StreamDefinition streamDefinition, OptionHolder optionHolder, .validateAndGetStaticValue(JSON_VALIDATION_IDENTIFIER, "false")); this.eventGroupingEnabled = Boolean.parseBoolean(optionHolder .validateAndGetStaticValue(EVENT_GROUPING_ENABLED, "true")); + this.enableNullAttributeValue = Boolean.parseBoolean(optionHolder + .validateAndGetStaticValue(ENABLE_NULL_ATTRIBUTE_VALUE, "false")); //if @payload() is added there must be at least 1 element in it, otherwise a SiddhiParserException raised if (payloadTemplateBuilderMap != null && payloadTemplateBuilderMap.size() != 1) { @@ -245,12 +256,23 @@ private String constructJsonForDefaultMapping(Object eventObj) { } if (eventObj instanceof Event) { Event event = (Event) eventObj; - JsonObject jsonEvent = constructSingleEventForDefaultMapping(doPartialProcessing(event)); + JsonObject jsonEvent; + if (enableNullAttributeValue) { + jsonEvent = constructSingleEventForDefaultMapping(event); + } else { + jsonEvent = constructSingleEventForDefaultMapping(doPartialProcessing(event)); + } + sb.append(jsonEvent); } else if (eventObj instanceof Event[]) { JsonArray eventArray = new JsonArray(); for (Event event : (Event[]) eventObj) { - eventArray.add(constructSingleEventForDefaultMapping(doPartialProcessing(event))); + if (enableNullAttributeValue) { + eventArray.add(constructSingleEventForDefaultMapping(event)); + } else { + eventArray.add(constructSingleEventForDefaultMapping(doPartialProcessing(event))); + } + } sb.append(eventArray.toString()); } else { @@ -265,12 +287,23 @@ private String constructJsonForDefaultMapping(Object eventObj) { } else { if (eventObj instanceof Event) { Event event = (Event) eventObj; - JsonObject jsonEvent = constructSingleEventForDefaultMapping(doPartialProcessing(event)); + JsonObject jsonEvent; + if (enableNullAttributeValue) { + jsonEvent = constructSingleEventForDefaultMapping(event); + } else { + jsonEvent = constructSingleEventForDefaultMapping(doPartialProcessing(event)); + } + return jsonEvent.toString(); } else if (eventObj instanceof Event[]) { JsonArray eventArray = new JsonArray(); for (Event event : (Event[]) eventObj) { - eventArray.add(constructSingleEventForDefaultMapping(doPartialProcessing(event))); + if (enableNullAttributeValue) { + eventArray.add(constructSingleEventForDefaultMapping(event)); + } else { + eventArray.add(constructSingleEventForDefaultMapping(doPartialProcessing(event))); + } + } return (eventArray.toString()); } else { @@ -298,13 +331,23 @@ private String constructJsonForCustomMapping(Object eventObj, TemplateBuilder pa } } if (eventObj instanceof Event) { - Event event = doPartialProcessing((Event) eventObj); + Event event; + if (enableNullAttributeValue) { + event = (Event) eventObj; + } else { + event = doPartialProcessing((Event) eventObj); + } sb.append(payloadTemplateBuilder.build(event)); } else if (eventObj instanceof Event[]) { String jsonEvent; sb.append(JSON_ARRAY_START_SYMBOL); for (Event e : (Event[]) eventObj) { - jsonEvent = (String) payloadTemplateBuilder.build(doPartialProcessing(e)); + if (enableNullAttributeValue) { + jsonEvent = (String) payloadTemplateBuilder.build(e); + } else { + jsonEvent = (String) payloadTemplateBuilder.build(doPartialProcessing(e)); + } + if (jsonEvent != null) { sb.append(jsonEvent).append(JSON_EVENT_SEPERATOR).append("\n"); } @@ -322,12 +365,21 @@ private String constructJsonForCustomMapping(Object eventObj, TemplateBuilder pa return sb.toString(); } else { if (eventObj.getClass() == Event.class) { - return (String) payloadTemplateBuilder.build(doPartialProcessing((Event) eventObj)); + if (enableNullAttributeValue) { + return (String) payloadTemplateBuilder.build((Event) eventObj); + } else { + return (String) payloadTemplateBuilder.build(doPartialProcessing((Event) eventObj)); + } } else if (eventObj.getClass() == Event[].class) { String jsonEvent; sb.append(JSON_ARRAY_START_SYMBOL); for (Event event : (Event[]) eventObj) { - jsonEvent = (String) payloadTemplateBuilder.build(doPartialProcessing(event)); + if (enableNullAttributeValue) { + jsonEvent = (String) payloadTemplateBuilder.build(event); + } else { + jsonEvent = (String) payloadTemplateBuilder.build(doPartialProcessing(event)); + } + if (jsonEvent != null) { sb.append(jsonEvent).append(JSON_EVENT_SEPERATOR).append("\n"); } @@ -349,7 +401,16 @@ private JsonObject constructSingleEventForDefaultMapping(Event event) { JsonObject innerParentObject = new JsonObject(); String attributeName; Object attributeValue; - Gson gson = new Gson(); + Gson gson; + if (enableNullAttributeValue) { + gson = new GsonBuilder() + .serializeNulls() + .setPrettyPrinting() + .create(); + } else { + gson = new Gson(); + } + for (int i = 0; i < data.length; i++) { attributeName = attributeNameArray[i]; attributeValue = data[i]; @@ -365,6 +426,8 @@ private JsonObject constructSingleEventForDefaultMapping(Event event) { innerParentObject.add(attributeName, gson.toJsonTree(attributeValue)); } } + } else { + innerParentObject.add(attributeName, null); } } jsonEventObject.add(EVENT_PARENT_TAG, innerParentObject); diff --git a/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java b/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java index b7ec0c5..23c1edf 100644 --- a/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java +++ b/component/src/test/java/io/siddhi/extension/map/json/sinkmapper/JsonSinkMapperTestCase.java @@ -1188,4 +1188,137 @@ public String getTopic() { //unsubscribe from "inMemory" broker per topic InMemoryBroker.unsubscribe(subscriberWSO2); } + + @Test + public void jsonSinkMapperTestCaseForPreserveNullValue() throws InterruptedException { + log.info("jsonSinkMapperTestCaseForPreserveNullValue"); + InMemoryBroker.Subscriber subscriberWSO2 = new InMemoryBroker.Subscriber() { + @Override + public void onMessage(Object msg) { + String jsonString; + switch (wso2Count.incrementAndGet()) { + case 1: + jsonString = "{\"event\":{\"symbol\":\"WSO2\",\"price\":55.6,\"volume\":null}}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + case 2: + jsonString = "{\"event\":{\"symbol\":\"WSO2\",\"price\":null,\"volume\":100}}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + case 3: + jsonString = "{\"event\":{\"symbol\":null,\"price\":55.6,\"volume\":100}}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + default: + AssertJUnit.fail(); + } + } + @Override + public String getTopic() { + return "WSO2"; + } + }; + //subscribe to "inMemory" broker per topic + InMemoryBroker.subscribe(subscriberWSO2); + String streams = "" + + "@App:name('TestSiddhiApp')" + + "define stream FooStream (symbol string, price float, volume long); " + + "@sink(type='inMemory', topic='WSO2', @map(type='json', enable.null.attribute.value='true')) " + + "define stream BarStream (symbol string, price float, volume long); "; + String query = "" + + "from FooStream " + + "select * " + + "insert into BarStream; "; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("FooStream"); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{"WSO2", 55.6f, null}); + stockStream.send(new Object[]{"WSO2", null, 100L}); + stockStream.send(new Object[]{null, 55.6f, 100L}); + SiddhiTestHelper.waitForEvents(waitTime, 3, wso2Count, timeout); + //assert event count + AssertJUnit.assertEquals(3, wso2Count.get()); + siddhiAppRuntime.shutdown(); + //unsubscribe from "inMemory" broker per topic + InMemoryBroker.unsubscribe(subscriberWSO2); + } + + @Test + public void jsonSinkCustomMapperTestCaseForPreserveNullValue() throws InterruptedException { + log.info("jsonSinkCustomMapperTestCaseForPreserveNullValue"); + InMemoryBroker.Subscriber subscriberWSO2 = new InMemoryBroker.Subscriber() { + @Override + public void onMessage(Object msg) { + String jsonString; + switch (wso2Count.incrementAndGet()) { + case 1: + jsonString = "{\n" + + " \"Stock Data\":{\n" + + " \"Symbol\":\"WSO2\",\n" + + " \"Price\":55.6,\n" + + " \"Volume\":null\n" + + " }\n" + + "}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + case 2: + jsonString = "{\n" + + " \"Stock Data\":{\n" + + " \"Symbol\":\"WSO2\",\n" + + " \"Price\":null,\n" + + " \"Volume\":100\n" + + " }\n" + + "}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + case 3: + jsonString = "{\n" + + " \"Stock Data\":{\n" + + " \"Symbol\":\"null\",\n" + + " \"Price\":55.6,\n" + + " \"Volume\":100\n" + + " }\n" + + "}"; + AssertJUnit.assertEquals(jsonString, msg); + break; + default: + AssertJUnit.fail(); + } + } + @Override + public String getTopic() { + return "WSO2"; + } + }; + //subscribe to "inMemory" broker per topic + InMemoryBroker.subscribe(subscriberWSO2); + String streams = "" + + "@App:name('TestSiddhiApp')" + + "define stream FooStream (symbol string, price float, volume long); " + + "@sink(type='inMemory', topic='WSO2', @map(type='json', enable.null.attribute.value='true'," + + "@payload(\"\"\"{\n" + " \"Stock Data\":{\n" + + " \"Symbol\":\"{{symbol}}\",\n" + " \"Price\":{{price}},\n" + + " \"Volume\":{{volume}}\n" + " }\n" + "}\"\"\"))) " + + "define stream BarStream (symbol string, price float, volume long); "; + String query = "" + + "from FooStream " + + "select * " + + "insert into BarStream; "; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("FooStream"); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{"WSO2", 55.6f, null}); + stockStream.send(new Object[]{"WSO2", null, 100L}); + stockStream.send(new Object[]{null, 55.6f, 100L}); + SiddhiTestHelper.waitForEvents(waitTime, 3, wso2Count, timeout); + //assert event count + AssertJUnit.assertEquals(3, wso2Count.get()); + siddhiAppRuntime.shutdown(); + //unsubscribe from "inMemory" broker per topic + InMemoryBroker.unsubscribe(subscriberWSO2); + } + + }