Skip to content

Commit

Permalink
Merge pull request #104 from dilini-muthumala/master
Browse files Browse the repository at this point in the history
Add nested Json testcase and Bump siddhi version as needed
  • Loading branch information
dilini-muthumala authored Jun 7, 2022
2 parents 3b391b8 + 6ca3f7a commit 93d21b1
Showing 1 changed file with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.siddhi.core.event.Event;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.core.util.EventPrinter;
import io.siddhi.core.util.SiddhiTestHelper;
import io.siddhi.core.util.transport.InMemoryBroker;
import org.apache.logging.log4j.LogManager;
Expand All @@ -41,11 +43,13 @@ public class JsonSinkMapperTestCase {
private final int timeout = 30000;
private AtomicInteger wso2Count = new AtomicInteger(0);
private AtomicInteger ibmCount = new AtomicInteger(0);
private volatile int count;

@BeforeMethod
public void init() {
wso2Count.set(0);
ibmCount.set(0);
count = 0;
}

/*
Expand Down Expand Up @@ -1321,5 +1325,51 @@ public String getTopic() {
InMemoryBroker.unsubscribe(subscriberWSO2);
}

@Test
public void jsonSinkMapperTestCaseForNestedJson() {
// see https://github.com/wso2/streaming-integrator/issues/228
log.info("JsonSinkMapperTestCase for a nested Json");

String app = "" +
"@App:name(\"PreDefinedProducerApp\")\n" +
"define stream DataStream" +
"(PublisherName string,AlertCategory String,AlertType String,recipientType String);\n" +
"@sink(type='log',\n" +
"@map(type='json', enclosing.element='event', validate.json='true', " +
"@payload(\"\"\"{\"alertTypeData\":{\"PublisherName\":\"{{PublisherName}}\",\"AlertCategory\"" +
":\"{{AlertCategory}}\",\"AlertType\":\"{{AlertType}}\"},\"recipientData\":{\"recipientType\":" +
"\"{{recipientType}}\"}}\"\"\")))\n" +
"define stream DataWriteStream (PublisherName string, AlertCategory string, " +
"AlertType string, recipientType string);\n" +
"from DataStream\n" +
"select PublisherName, AlertCategory, AlertType , recipientType\n" +
"insert into DataWriteStream; ";

SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app);
siddhiAppRuntime.addCallback("DataWriteStream", new StreamCallback() {

@Override
public void receive(Event[] events) {
EventPrinter.print(events);
count += events.length;
}
});
InputHandler stockStream = siddhiAppRuntime.getInputHandler("DataStream");
siddhiAppRuntime.start();
try {
stockStream.send(new Object[]{"WSO2", "IBM", "FOO", "BAR"});
} catch (InterruptedException e) {
AssertJUnit.fail("Failed to send the message. cause: " + e.getMessage());
}

try {
Thread.sleep(100);
} catch (InterruptedException e) {
AssertJUnit.fail("Failed to run the testcase. cause: " + e.getMessage());
}
siddhiAppRuntime.shutdown();
AssertJUnit.assertEquals(1, count);
}

}

0 comments on commit 93d21b1

Please sign in to comment.