Skip to content

Commit

Permalink
Add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
dnwick committed Mar 19, 2021
1 parent a44bc0b commit 7e0c9a1
Showing 1 changed file with 133 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1188,4 +1188,137 @@ public String getTopic() {
//unsubscribe from "inMemory" broker per topic
InMemoryBroker.unsubscribe(subscriberWSO2);
}

@Test
public void jsonSinkMapperTestCaseForPreserveNullValue() throws InterruptedException {
log.info("jsonSinkMapperTestCaseForPreserveNullValue");
InMemoryBroker.Subscriber subscriberWSO2 = new InMemoryBroker.Subscriber() {
@Override
public void onMessage(Object msg) {
String jsonString;
switch (wso2Count.incrementAndGet()) {
case 1:
jsonString = "{\"event\":{\"symbol\":\"WSO2\",\"price\":55.6,\"volume\":null}}";
AssertJUnit.assertEquals(jsonString, msg);
break;
case 2:
jsonString = "{\"event\":{\"symbol\":\"WSO2\",\"price\":null,\"volume\":100}}";
AssertJUnit.assertEquals(jsonString, msg);
break;
case 3:
jsonString = "{\"event\":{\"symbol\":null,\"price\":55.6,\"volume\":100}}";
AssertJUnit.assertEquals(jsonString, msg);
break;
default:
AssertJUnit.fail();
}
}
@Override
public String getTopic() {
return "WSO2";
}
};
//subscribe to "inMemory" broker per topic
InMemoryBroker.subscribe(subscriberWSO2);
String streams = "" +
"@App:name('TestSiddhiApp')" +
"define stream FooStream (symbol string, price float, volume long); " +
"@sink(type='inMemory', topic='WSO2', @map(type='json', enable.null.attribute.value='true')) " +
"define stream BarStream (symbol string, price float, volume long); ";
String query = "" +
"from FooStream " +
"select * " +
"insert into BarStream; ";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
InputHandler stockStream = siddhiAppRuntime.getInputHandler("FooStream");
siddhiAppRuntime.start();
stockStream.send(new Object[]{"WSO2", 55.6f, null});
stockStream.send(new Object[]{"WSO2", null, 100L});
stockStream.send(new Object[]{null, 55.6f, 100L});
SiddhiTestHelper.waitForEvents(waitTime, 3, wso2Count, timeout);
//assert event count
AssertJUnit.assertEquals(3, wso2Count.get());
siddhiAppRuntime.shutdown();
//unsubscribe from "inMemory" broker per topic
InMemoryBroker.unsubscribe(subscriberWSO2);
}

@Test
public void jsonSinkCustomMapperTestCaseForPreserveNullValue() throws InterruptedException {
log.info("jsonSinkCustomMapperTestCaseForPreserveNullValue");
InMemoryBroker.Subscriber subscriberWSO2 = new InMemoryBroker.Subscriber() {
@Override
public void onMessage(Object msg) {
String jsonString;
switch (wso2Count.incrementAndGet()) {
case 1:
jsonString = "{\n" +
" \"Stock Data\":{\n" +
" \"Symbol\":\"WSO2\",\n" +
" \"Price\":55.6,\n" +
" \"Volume\":null\n" +
" }\n" +
"}";
AssertJUnit.assertEquals(jsonString, msg);
break;
case 2:
jsonString = "{\n" +
" \"Stock Data\":{\n" +
" \"Symbol\":\"WSO2\",\n" +
" \"Price\":null,\n" +
" \"Volume\":100\n" +
" }\n" +
"}";
AssertJUnit.assertEquals(jsonString, msg);
break;
case 3:
jsonString = "{\n" +
" \"Stock Data\":{\n" +
" \"Symbol\":\"null\",\n" +
" \"Price\":55.6,\n" +
" \"Volume\":100\n" +
" }\n" +
"}";
AssertJUnit.assertEquals(jsonString, msg);
break;
default:
AssertJUnit.fail();
}
}
@Override
public String getTopic() {
return "WSO2";
}
};
//subscribe to "inMemory" broker per topic
InMemoryBroker.subscribe(subscriberWSO2);
String streams = "" +
"@App:name('TestSiddhiApp')" +
"define stream FooStream (symbol string, price float, volume long); " +
"@sink(type='inMemory', topic='WSO2', @map(type='json', enable.null.attribute.value='true',"
+ "@payload(\"\"\"{\n" + " \"Stock Data\":{\n"
+ " \"Symbol\":\"{{symbol}}\",\n" + " \"Price\":{{price}},\n"
+ " \"Volume\":{{volume}}\n" + " }\n" + "}\"\"\"))) "
+ "define stream BarStream (symbol string, price float, volume long); ";
String query = "" +
"from FooStream " +
"select * " +
"insert into BarStream; ";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
InputHandler stockStream = siddhiAppRuntime.getInputHandler("FooStream");
siddhiAppRuntime.start();
stockStream.send(new Object[]{"WSO2", 55.6f, null});
stockStream.send(new Object[]{"WSO2", null, 100L});
stockStream.send(new Object[]{null, 55.6f, 100L});
SiddhiTestHelper.waitForEvents(waitTime, 3, wso2Count, timeout);
//assert event count
AssertJUnit.assertEquals(3, wso2Count.get());
siddhiAppRuntime.shutdown();
//unsubscribe from "inMemory" broker per topic
InMemoryBroker.unsubscribe(subscriberWSO2);
}


}

0 comments on commit 7e0c9a1

Please sign in to comment.