Skip to content

Commit

Permalink
Merge pull request #102 from dnwick/master
Browse files Browse the repository at this point in the history
Allow null values to be output as null
  • Loading branch information
dnwick authored Mar 22, 2021
2 parents df59ccb + 7e0c9a1 commit cb96996
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.siddhi.extension.map.json.sinkmapper;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.siddhi.annotation.Example;
Expand Down Expand Up @@ -79,7 +80,13 @@
"'true' or the value can be set to 'false' to separate events",
type = {DataType.BOOL},
optional = true,
defaultValue = "true")
defaultValue = "true"),
@Parameter(name = "enable.null.attribute.value",
description = "If this parameter is true, output parameter values will contain null values " +
"if not they will be undefined",
type = {DataType.BOOL},
optional = true,
defaultValue = "false")
},
examples = {
@Example(
Expand Down Expand Up @@ -119,6 +126,7 @@ public class JsonSinkMapper extends SinkMapper {
private static final String DEFAULT_ENCLOSING_ELEMENT = "$";
private static final String JSON_VALIDATION_IDENTIFIER = "validate.json";
private static final String EVENT_GROUPING_ENABLED = "event.grouping.enabled";
private static final String ENABLE_NULL_ATTRIBUTE_VALUE = "enable.null.attribute.value";
private static final String JSON_EVENT_SEPERATOR = ",";
private static final String JSON_KEYVALUE_SEPERATOR = ":";
private static final String JSON_ARRAY_START_SYMBOL = "[";
Expand All @@ -131,6 +139,7 @@ public class JsonSinkMapper extends SinkMapper {
private String enclosingElement = null;
private boolean isJsonValidationEnabled = false;
private boolean eventGroupingEnabled = true;
private boolean enableNullAttributeValue = false;


@Override
Expand Down Expand Up @@ -162,6 +171,8 @@ public void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
.validateAndGetStaticValue(JSON_VALIDATION_IDENTIFIER, "false"));
this.eventGroupingEnabled = Boolean.parseBoolean(optionHolder
.validateAndGetStaticValue(EVENT_GROUPING_ENABLED, "true"));
this.enableNullAttributeValue = Boolean.parseBoolean(optionHolder
.validateAndGetStaticValue(ENABLE_NULL_ATTRIBUTE_VALUE, "false"));

//if @payload() is added there must be at least 1 element in it, otherwise a SiddhiParserException raised
if (payloadTemplateBuilderMap != null && payloadTemplateBuilderMap.size() != 1) {
Expand Down Expand Up @@ -245,12 +256,23 @@ private String constructJsonForDefaultMapping(Object eventObj) {
}
if (eventObj instanceof Event) {
Event event = (Event) eventObj;
JsonObject jsonEvent = constructSingleEventForDefaultMapping(doPartialProcessing(event));
JsonObject jsonEvent;
if (enableNullAttributeValue) {
jsonEvent = constructSingleEventForDefaultMapping(event);
} else {
jsonEvent = constructSingleEventForDefaultMapping(doPartialProcessing(event));
}

sb.append(jsonEvent);
} else if (eventObj instanceof Event[]) {
JsonArray eventArray = new JsonArray();
for (Event event : (Event[]) eventObj) {
eventArray.add(constructSingleEventForDefaultMapping(doPartialProcessing(event)));
if (enableNullAttributeValue) {
eventArray.add(constructSingleEventForDefaultMapping(event));
} else {
eventArray.add(constructSingleEventForDefaultMapping(doPartialProcessing(event)));
}

}
sb.append(eventArray.toString());
} else {
Expand All @@ -265,12 +287,23 @@ private String constructJsonForDefaultMapping(Object eventObj) {
} else {
if (eventObj instanceof Event) {
Event event = (Event) eventObj;
JsonObject jsonEvent = constructSingleEventForDefaultMapping(doPartialProcessing(event));
JsonObject jsonEvent;
if (enableNullAttributeValue) {
jsonEvent = constructSingleEventForDefaultMapping(event);
} else {
jsonEvent = constructSingleEventForDefaultMapping(doPartialProcessing(event));
}

return jsonEvent.toString();
} else if (eventObj instanceof Event[]) {
JsonArray eventArray = new JsonArray();
for (Event event : (Event[]) eventObj) {
eventArray.add(constructSingleEventForDefaultMapping(doPartialProcessing(event)));
if (enableNullAttributeValue) {
eventArray.add(constructSingleEventForDefaultMapping(event));
} else {
eventArray.add(constructSingleEventForDefaultMapping(doPartialProcessing(event)));
}

}
return (eventArray.toString());
} else {
Expand Down Expand Up @@ -298,13 +331,23 @@ private String constructJsonForCustomMapping(Object eventObj, TemplateBuilder pa
}
}
if (eventObj instanceof Event) {
Event event = doPartialProcessing((Event) eventObj);
Event event;
if (enableNullAttributeValue) {
event = (Event) eventObj;
} else {
event = doPartialProcessing((Event) eventObj);
}
sb.append(payloadTemplateBuilder.build(event));
} else if (eventObj instanceof Event[]) {
String jsonEvent;
sb.append(JSON_ARRAY_START_SYMBOL);
for (Event e : (Event[]) eventObj) {
jsonEvent = (String) payloadTemplateBuilder.build(doPartialProcessing(e));
if (enableNullAttributeValue) {
jsonEvent = (String) payloadTemplateBuilder.build(e);
} else {
jsonEvent = (String) payloadTemplateBuilder.build(doPartialProcessing(e));
}

if (jsonEvent != null) {
sb.append(jsonEvent).append(JSON_EVENT_SEPERATOR).append("\n");
}
Expand All @@ -322,12 +365,21 @@ private String constructJsonForCustomMapping(Object eventObj, TemplateBuilder pa
return sb.toString();
} else {
if (eventObj.getClass() == Event.class) {
return (String) payloadTemplateBuilder.build(doPartialProcessing((Event) eventObj));
if (enableNullAttributeValue) {
return (String) payloadTemplateBuilder.build((Event) eventObj);
} else {
return (String) payloadTemplateBuilder.build(doPartialProcessing((Event) eventObj));
}
} else if (eventObj.getClass() == Event[].class) {
String jsonEvent;
sb.append(JSON_ARRAY_START_SYMBOL);
for (Event event : (Event[]) eventObj) {
jsonEvent = (String) payloadTemplateBuilder.build(doPartialProcessing(event));
if (enableNullAttributeValue) {
jsonEvent = (String) payloadTemplateBuilder.build(event);
} else {
jsonEvent = (String) payloadTemplateBuilder.build(doPartialProcessing(event));
}

if (jsonEvent != null) {
sb.append(jsonEvent).append(JSON_EVENT_SEPERATOR).append("\n");
}
Expand All @@ -349,7 +401,16 @@ private JsonObject constructSingleEventForDefaultMapping(Event event) {
JsonObject innerParentObject = new JsonObject();
String attributeName;
Object attributeValue;
Gson gson = new Gson();
Gson gson;
if (enableNullAttributeValue) {
gson = new GsonBuilder()
.serializeNulls()
.setPrettyPrinting()
.create();
} else {
gson = new Gson();
}

for (int i = 0; i < data.length; i++) {
attributeName = attributeNameArray[i];
attributeValue = data[i];
Expand All @@ -365,6 +426,8 @@ private JsonObject constructSingleEventForDefaultMapping(Event event) {
innerParentObject.add(attributeName, gson.toJsonTree(attributeValue));
}
}
} else {
innerParentObject.add(attributeName, null);
}
}
jsonEventObject.add(EVENT_PARENT_TAG, innerParentObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1188,4 +1188,137 @@ public String getTopic() {
//unsubscribe from "inMemory" broker per topic
InMemoryBroker.unsubscribe(subscriberWSO2);
}

@Test
public void jsonSinkMapperTestCaseForPreserveNullValue() throws InterruptedException {
log.info("jsonSinkMapperTestCaseForPreserveNullValue");
InMemoryBroker.Subscriber subscriberWSO2 = new InMemoryBroker.Subscriber() {
@Override
public void onMessage(Object msg) {
String jsonString;
switch (wso2Count.incrementAndGet()) {
case 1:
jsonString = "{\"event\":{\"symbol\":\"WSO2\",\"price\":55.6,\"volume\":null}}";
AssertJUnit.assertEquals(jsonString, msg);
break;
case 2:
jsonString = "{\"event\":{\"symbol\":\"WSO2\",\"price\":null,\"volume\":100}}";
AssertJUnit.assertEquals(jsonString, msg);
break;
case 3:
jsonString = "{\"event\":{\"symbol\":null,\"price\":55.6,\"volume\":100}}";
AssertJUnit.assertEquals(jsonString, msg);
break;
default:
AssertJUnit.fail();
}
}
@Override
public String getTopic() {
return "WSO2";
}
};
//subscribe to "inMemory" broker per topic
InMemoryBroker.subscribe(subscriberWSO2);
String streams = "" +
"@App:name('TestSiddhiApp')" +
"define stream FooStream (symbol string, price float, volume long); " +
"@sink(type='inMemory', topic='WSO2', @map(type='json', enable.null.attribute.value='true')) " +
"define stream BarStream (symbol string, price float, volume long); ";
String query = "" +
"from FooStream " +
"select * " +
"insert into BarStream; ";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
InputHandler stockStream = siddhiAppRuntime.getInputHandler("FooStream");
siddhiAppRuntime.start();
stockStream.send(new Object[]{"WSO2", 55.6f, null});
stockStream.send(new Object[]{"WSO2", null, 100L});
stockStream.send(new Object[]{null, 55.6f, 100L});
SiddhiTestHelper.waitForEvents(waitTime, 3, wso2Count, timeout);
//assert event count
AssertJUnit.assertEquals(3, wso2Count.get());
siddhiAppRuntime.shutdown();
//unsubscribe from "inMemory" broker per topic
InMemoryBroker.unsubscribe(subscriberWSO2);
}

@Test
public void jsonSinkCustomMapperTestCaseForPreserveNullValue() throws InterruptedException {
log.info("jsonSinkCustomMapperTestCaseForPreserveNullValue");
InMemoryBroker.Subscriber subscriberWSO2 = new InMemoryBroker.Subscriber() {
@Override
public void onMessage(Object msg) {
String jsonString;
switch (wso2Count.incrementAndGet()) {
case 1:
jsonString = "{\n" +
" \"Stock Data\":{\n" +
" \"Symbol\":\"WSO2\",\n" +
" \"Price\":55.6,\n" +
" \"Volume\":null\n" +
" }\n" +
"}";
AssertJUnit.assertEquals(jsonString, msg);
break;
case 2:
jsonString = "{\n" +
" \"Stock Data\":{\n" +
" \"Symbol\":\"WSO2\",\n" +
" \"Price\":null,\n" +
" \"Volume\":100\n" +
" }\n" +
"}";
AssertJUnit.assertEquals(jsonString, msg);
break;
case 3:
jsonString = "{\n" +
" \"Stock Data\":{\n" +
" \"Symbol\":\"null\",\n" +
" \"Price\":55.6,\n" +
" \"Volume\":100\n" +
" }\n" +
"}";
AssertJUnit.assertEquals(jsonString, msg);
break;
default:
AssertJUnit.fail();
}
}
@Override
public String getTopic() {
return "WSO2";
}
};
//subscribe to "inMemory" broker per topic
InMemoryBroker.subscribe(subscriberWSO2);
String streams = "" +
"@App:name('TestSiddhiApp')" +
"define stream FooStream (symbol string, price float, volume long); " +
"@sink(type='inMemory', topic='WSO2', @map(type='json', enable.null.attribute.value='true',"
+ "@payload(\"\"\"{\n" + " \"Stock Data\":{\n"
+ " \"Symbol\":\"{{symbol}}\",\n" + " \"Price\":{{price}},\n"
+ " \"Volume\":{{volume}}\n" + " }\n" + "}\"\"\"))) "
+ "define stream BarStream (symbol string, price float, volume long); ";
String query = "" +
"from FooStream " +
"select * " +
"insert into BarStream; ";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
InputHandler stockStream = siddhiAppRuntime.getInputHandler("FooStream");
siddhiAppRuntime.start();
stockStream.send(new Object[]{"WSO2", 55.6f, null});
stockStream.send(new Object[]{"WSO2", null, 100L});
stockStream.send(new Object[]{null, 55.6f, 100L});
SiddhiTestHelper.waitForEvents(waitTime, 3, wso2Count, timeout);
//assert event count
AssertJUnit.assertEquals(3, wso2Count.get());
siddhiAppRuntime.shutdown();
//unsubscribe from "inMemory" broker per topic
InMemoryBroker.unsubscribe(subscriberWSO2);
}


}

0 comments on commit cb96996

Please sign in to comment.