From 01d61f5b8cc15ad253505ed3f72e99da968dfa85 Mon Sep 17 00:00:00 2001 From: "sketharan1996.15@cse.mrt.ac.lk" Date: Fri, 21 Sep 2018 12:25:56 +0530 Subject: [PATCH] Some code refactoring done. --- .../appcreator/AbstractSiddhiAppCreator.java | 38 +- .../core/appcreator/SPMBSiddhiAppCreator.java | 79 ++- .../core/util/ResourceManagerConstants.java | 49 +- .../jobmanager/core/JmsTransportTestCase.java | 664 ++++++++++-------- 4 files changed, 474 insertions(+), 356 deletions(-) diff --git a/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/appcreator/AbstractSiddhiAppCreator.java b/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/appcreator/AbstractSiddhiAppCreator.java index c7c4c0fdcf3..50e3f80f82b 100644 --- a/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/appcreator/AbstractSiddhiAppCreator.java +++ b/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/appcreator/AbstractSiddhiAppCreator.java @@ -30,20 +30,21 @@ import java.util.Map; /** - * Abstract implementation of {@link SiddhiAppCreator}. Developers can use this extension point to implement custom - * Siddhi App Creator based on the distribute implementation. + * Abstract implementation of {@link SiddhiAppCreator}. Developers can use this extension point + * to implement custom Siddhi App Creator based on the distribute implementation. */ public abstract class AbstractSiddhiAppCreator implements SiddhiAppCreator { protected boolean transportChannelCreationEnabled; public List createApps(SiddhiTopology topology) { transportChannelCreationEnabled = topology.isTransportChannelCreationEnabled(); - List deployableSiddhiQueryGroupList = new ArrayList<>(topology.getQueryGroupList - ().size()); + List deployableSiddhiQueryGroupList = + new ArrayList<>(topology.getQueryGroupList().size()); for (SiddhiQueryGroup queryGroup : topology.getQueryGroupList()) { - DeployableSiddhiQueryGroup deployableQueryGroup = new DeployableSiddhiQueryGroup(queryGroup.getName(), - queryGroup.isReceiverQueryGroup(), - queryGroup.getParallelism()); + DeployableSiddhiQueryGroup deployableQueryGroup = + new DeployableSiddhiQueryGroup(queryGroup.getName(), + queryGroup.isReceiverQueryGroup(), + queryGroup.getParallelism()); deployableQueryGroup.setSiddhiQueries(createApps(topology.getName(), queryGroup)); deployableSiddhiQueryGroupList.add(deployableQueryGroup); } @@ -51,26 +52,29 @@ public List createApps(SiddhiTopology topology) { } /** - * This method should return valid concrete Siddhi App/s as Strings. No. of returned Siddhi Apps should equal the - * parallelism count for query group. + * This method should return valid concrete Siddhi App/s as Strings. No. of returned Siddhi + * Apps should equal the parallelism count for query group. * * @param queryGroup Input query group to produce Siddhi Apps. * @return List of valid concrete Siddhi Apps as String. */ - protected abstract List createApps(String siddhiAppName, SiddhiQueryGroup queryGroup); + protected abstract List createApps(String siddhiAppName, + SiddhiQueryGroup queryGroup); - protected List getPartitionNumbers(int appParallelism, int availablePartitionCount, int currentAppNum) { + protected List getPartitionNumbers(int appParallelism, int availablePartitionCount, + int currentAppNum) { List partitionNumbers = new ArrayList<>(); if (availablePartitionCount == appParallelism) { partitionNumbers.add(currentAppNum); return partitionNumbers; } else { - //availablePartitionCount < appParallelism scenario cannot occur according to design. Hence if - // availablePartitionCount > appParallelism + //availablePartitionCount < appParallelism scenario cannot occur according to design. + // Hence if availablePartitionCount > appParallelism //// TODO: 10/19/17 improve logic int partitionsPerNode = availablePartitionCount / appParallelism; if (currentAppNum + 1 == appParallelism) { //if last app - int remainingPartitions = availablePartitionCount - ((appParallelism - 1) * partitionsPerNode); + int remainingPartitions = availablePartitionCount - ((appParallelism - 1) * + partitionsPerNode); for (int j = 0; j < remainingPartitions; j++) { partitionNumbers.add((currentAppNum * partitionsPerNode) + j); } @@ -84,14 +88,16 @@ protected List getPartitionNumbers(int appParallelism, int availablePar } } - protected List generateQueryList(String queryTemplate, String queryGroupName, int parallelism) { + protected List generateQueryList(String queryTemplate, String queryGroupName, + int parallelism) { List queries = new ArrayList<>(parallelism); for (int i = 0; i < parallelism; i++) { Map valuesMap = new HashMap<>(1); String appName = queryGroupName + "-" + (i + 1); valuesMap.put(ResourceManagerConstants.APP_NAME, appName); StrSubstitutor substitutor = new StrSubstitutor(valuesMap); - queries.add(new SiddhiQuery(appName, substitutor.replace(queryTemplate), false)); + queries.add(new SiddhiQuery(appName, substitutor.replace(queryTemplate), + false)); } return queries; } diff --git a/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/appcreator/SPMBSiddhiAppCreator.java b/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/appcreator/SPMBSiddhiAppCreator.java index 81590cd7da8..5528206de2a 100644 --- a/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/appcreator/SPMBSiddhiAppCreator.java +++ b/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/appcreator/SPMBSiddhiAppCreator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except @@ -60,8 +60,8 @@ protected List createApps(String siddhiAppName, SiddhiQueryGroup qu * @param queryList Contains the query of the current execution group replicated * to the parallelism of the group. * @param outputStreams Collection of current execution group's output streams - * - * Assign the jms transport headers for each instance siddhi applications output streams for given execution group + * Assign the jms transport headers for each instance siddhi applications output streams + * for given execution group */ private void processOutputStreams(String siddhiAppName, List queryList, Collection outputStreams) { @@ -75,16 +75,18 @@ private void processOutputStreams(String siddhiAppName, List queryL for (PublishingStrategyDataHolder holder : outputStream.getPublishingStrategyList()) { sinkValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" + - outputStream.getStreamName() + (holder.getGroupingField() == null ? "" : ("_" + holder - .getGroupingField()))); + outputStream.getStreamName() + (holder.getGroupingField() == null ? "" + : ("_" + holder.getGroupingField()))); if (holder.getStrategy() == TransportStrategy.FIELD_GROUPING) { if (partitionKeys.get(holder.getGroupingField()) != null && - partitionKeys.get(holder.getGroupingField()) > holder.getParallelism()) { + partitionKeys.get(holder.getGroupingField()) > holder.getParallelism()) + { continue; } partitionKeys.put(holder.getGroupingField(), holder.getParallelism()); - sinkValuesMap.put(ResourceManagerConstants.PARTITION_KEY, holder.getGroupingField()); + sinkValuesMap.put(ResourceManagerConstants.PARTITION_KEY, + holder.getGroupingField()); List destinations = new ArrayList<>(holder.getParallelism()); for (int i = 0; i < holder.getParallelism(); i++) { @@ -92,32 +94,39 @@ private void processOutputStreams(String siddhiAppName, List queryL destinationMap.put(ResourceManagerConstants.PARTITION_TOPIC, sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION) + "_" + String.valueOf(i)); - destinations.add(getUpdatedQuery(ResourceManagerConstants.DESTINATION_TOPIC, destinationMap)); + destinations.add(getUpdatedQuery(ResourceManagerConstants.DESTINATION_TOPIC, + destinationMap)); } sinkValuesMap.put(ResourceManagerConstants.DESTINATIONS, StringUtils.join(destinations, ",")); - String sinkString = getUpdatedQuery(ResourceManagerConstants.PARTITIONED_MB_SINK_TEMPLATE, + String sinkString = + getUpdatedQuery(ResourceManagerConstants.PARTITIONED_MB_SINK_TEMPLATE, sinkValuesMap); - sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION), sinkString); + sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION), + sinkString); } else if (holder.getStrategy() == TransportStrategy.ROUND_ROBIN) { //if holder uses RR as strategy then unique topic name will be defined - sinkValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" + - outputStream.getStreamName() + "_" + String.valueOf(rrHolderCount)); + sinkValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + + "_" + outputStream.getStreamName() + "_" + + String.valueOf(rrHolderCount)); rrHolderCount++; - String sinkString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_QUEUE_SINK_TEMPLATE, - sinkValuesMap); - sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION), sinkString); + String sinkString = getUpdatedQuery(ResourceManagerConstants + .DEFAULT_MB_QUEUE_SINK_TEMPLATE, sinkValuesMap); + sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION) + , sinkString); } else if (holder.getStrategy() == TransportStrategy.ALL) { - String sinkString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_TOPIC_SINK_TEMPLATE, - sinkValuesMap); - sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION), sinkString); + String sinkString = getUpdatedQuery(ResourceManagerConstants + .DEFAULT_MB_TOPIC_SINK_TEMPLATE, sinkValuesMap); + sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION) + , sinkString); } } Map queryValuesMap = new HashMap<>(1); - queryValuesMap.put(outputStream.getStreamName(), StringUtils.join(sinkList.values(), "\n")); + queryValuesMap.put(outputStream.getStreamName(), + StringUtils.join(sinkList.values(), "\n")); updateQueryList(queryList, queryValuesMap); } } @@ -128,7 +137,6 @@ private void processOutputStreams(String siddhiAppName, List queryL * @param queryList Contains the query of the current execution group replicated * to the parallelism of the group. * @param inputStreams Collection of current execution group's input streams - * * Assign the jms transport headers for each instance siddhi applications input * streams for a given execution group */ @@ -137,31 +145,34 @@ private void processInputStreams(String siddhiAppName, List queryLi Map sourceValuesMap = new HashMap<>(); for (InputStreamDataHolder inputStream : inputStreams) { - SubscriptionStrategyDataHolder subscriptionStrategy = inputStream.getSubscriptionStrategy(); + SubscriptionStrategyDataHolder subscriptionStrategy = inputStream + .getSubscriptionStrategy(); sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" + - inputStream.getStreamName() + (inputStream.getSubscriptionStrategy().getPartitionKey() == - null ? "" : ("_" + inputStream.getSubscriptionStrategy().getPartitionKey()))); + inputStream.getStreamName() + (inputStream.getSubscriptionStrategy() + .getPartitionKey() == null ? "" : ("_" + inputStream.getSubscriptionStrategy() + .getPartitionKey()))); if (!inputStream.isUserGiven()) { if (subscriptionStrategy.getStrategy() == TransportStrategy.FIELD_GROUPING) { for (int i = 0; i < queryList.size(); i++) { List sourceQueries = new ArrayList<>(); - List partitionNumbers = getPartitionNumbers(queryList.size(), subscriptionStrategy + List partitionNumbers = getPartitionNumbers(queryList.size(), + subscriptionStrategy .getOfferedParallelism(), i); for (int topicCount : partitionNumbers) { String topicName = siddhiAppName + "_" + inputStream.getStreamName() + "_" + inputStream.getSubscriptionStrategy().getPartitionKey() + "_" + Integer.toString(topicCount); sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, topicName); - String sourceQuery = - getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_TOPIC_SOURCE_TEMPLATE, - sourceValuesMap); + String sourceQuery = getUpdatedQuery(ResourceManagerConstants + .DEFAULT_MB_TOPIC_SOURCE_TEMPLATE, sourceValuesMap); sourceQueries.add(sourceQuery); } String combinedQueryHeader = StringUtils.join(sourceQueries, "\n"); Map queryValuesMap = new HashMap<>(1); queryValuesMap.put(inputStream.getStreamName(), combinedQueryHeader); - String updatedQuery = getUpdatedQuery(queryList.get(i).getApp(), queryValuesMap); + String updatedQuery = getUpdatedQuery(queryList.get(i).getApp() + , queryValuesMap); queryList.get(i).setApp(updatedQuery); } } else if (subscriptionStrategy.getStrategy() == TransportStrategy.ROUND_ROBIN) { @@ -176,18 +187,18 @@ private void processInputStreams(String siddhiAppName, List queryLi queueCount += 1; rrTrackerMap.put(inputStream.getStreamName(), queueCount); sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, queueName); - String sourceString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_QUEUE_SOURCE_TEMPLATE, - sourceValuesMap); + String sourceString = getUpdatedQuery(ResourceManagerConstants + .DEFAULT_MB_QUEUE_SOURCE_TEMPLATE, sourceValuesMap); Map queryValuesMap = new HashMap<>(1); queryValuesMap.put(inputStream.getStreamName(), sourceString); updateQueryList(queryList, queryValuesMap); } else { - sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" + - inputStream.getStreamName()); + sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + + "_" + inputStream.getStreamName()); for (SiddhiQuery aQueryList : queryList) { - String sourceString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_TOPIC_SOURCE_TEMPLATE, - sourceValuesMap); + String sourceString = getUpdatedQuery(ResourceManagerConstants + .DEFAULT_MB_TOPIC_SOURCE_TEMPLATE, sourceValuesMap); Map queryValuesMap = new HashMap<>(1); queryValuesMap.put(inputStream.getStreamName(), sourceString); String updatedQuery = getUpdatedQuery(aQueryList.getApp(), queryValuesMap); diff --git a/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/util/ResourceManagerConstants.java b/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/util/ResourceManagerConstants.java index b8a026228b6..d93d422c0bd 100644 --- a/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/util/ResourceManagerConstants.java +++ b/components/org.wso2.carbon.sp.jobmanager.core/src/main/java/org/wso2/carbon/sp/jobmanager/core/util/ResourceManagerConstants.java @@ -47,50 +47,55 @@ public class ResourceManagerConstants { public static final String PARTITION_NO = "partitionNo"; public static final String MAPPING = "xml"; - public static final String DEFAULT_KAFKA_SOURCE_TEMPLATE = "@source(type='kafka', topic.list='${" + TOPIC_LIST + - "}', group.id='${" + CONSUMER_GROUP_ID + "}', threading.option='single.thread', bootstrap.servers='${" + public static final String DEFAULT_KAFKA_SOURCE_TEMPLATE = "@source(type='kafka', topic.list=" + + "'${" + TOPIC_LIST + "}', group.id='${" + CONSUMER_GROUP_ID + "}'," + + " threading.option='single.thread', bootstrap.servers='${" + BOOTSTRAP_SERVER_URL + "}', @map(type='" + MAPPING + "'))"; public static final String PARTITIONED_KAFKA_SOURCE_TEMPLATE = - "@source(type='kafka', topic.list='${" + TOPIC_LIST + "}', group.id='${" + CONSUMER_GROUP_ID + "}', " - + "threading.option='partition.wise', bootstrap.servers='${" + BOOTSTRAP_SERVER_URL + "}', " + "@source(type='kafka', topic.list='${" + TOPIC_LIST + "}', group.id='${" + + CONSUMER_GROUP_ID + "}', threading.option='partition.wise'," + + " bootstrap.servers='${" + BOOTSTRAP_SERVER_URL + "}', " + "partition.no.list='${" + PARTITION_LIST + "}',@map(type='" + MAPPING + "'))"; - public static final String DEFAULT_KAFKA_SINK_TEMPLATE = "@sink(type='kafka', topic='${" + TOPIC_LIST + - "}' , bootstrap.servers='${" + BOOTSTRAP_SERVER_URL + "}', @map(type='" + MAPPING + "'))"; + public static final String DEFAULT_KAFKA_SINK_TEMPLATE = "@sink(type='kafka', topic='${" + + TOPIC_LIST + "}' , bootstrap.servers='${" + BOOTSTRAP_SERVER_URL + "}', @map(type='" + + MAPPING + "'))"; - public static final String PARTITIONED_KAFKA_SINK_TEMPLATE = "@sink(type='kafka', topic='${" + TOPIC_LIST + - "}' , bootstrap.servers='${" + BOOTSTRAP_SERVER_URL + "}', @map(type='" + MAPPING + "'), @distribution" - + "(strategy='partitioned', partitionKey='${" + PARTITION_KEY + "}', ${" + DESTINATIONS + "} ))"; + public static final String PARTITIONED_KAFKA_SINK_TEMPLATE = "@sink(type='kafka', topic='${" + + TOPIC_LIST + "}' , bootstrap.servers='${" + BOOTSTRAP_SERVER_URL + "}', @map(type='" + + MAPPING + "'), @distribution(strategy='partitioned', partitionKey='${" + + PARTITION_KEY + "}', ${" + DESTINATIONS + "} ))"; public static final String PARTITION_TOPIC = "partitionTopic"; public static final String MB_DESTINATION = "mbDestination"; - public static final String DESTINATION_TOPIC = "@destination(destination = '${" + PARTITION_TOPIC + "}')"; + public static final String DESTINATION_TOPIC = "@destination(destination = '${" + + PARTITION_TOPIC + "}')"; public static final String DEFAULT_MB_TOPIC_SOURCE_TEMPLATE = "@source(type='jms'," + "factory.initial='org.wso2.andes.jndi.PropertiesFileInitialContextFactory'," + "provider.url='../../resources/jndi.properties',connection.factory.type='topic'," - + "destination ='${" + MB_DESTINATION + "}' , connection.factory.jndi.name='TopicConnectionFactory'," - + "@map(type='" + MAPPING + "'))"; + + "destination ='${" + MB_DESTINATION + "}' , connection.factory.jndi.name=" + + "'TopicConnectionFactory',@map(type='" + MAPPING + "'))"; public static final String DEFAULT_MB_QUEUE_SOURCE_TEMPLATE = "@source(type='jms'," + "factory.initial='org.wso2.andes.jndi.PropertiesFileInitialContextFactory'," + "provider.url='../../resources/jndi.properties',connection.factory.type='queue'," - + "destination ='${" + MB_DESTINATION + "}',connection.factory.jndi.name='QueueConnectionFactory'," - + "@map(type ='" + MAPPING + "'))"; + + "destination ='${" + MB_DESTINATION + "}',connection.factory.jndi.name=" + + "'QueueConnectionFactory',@map(type ='" + MAPPING + "'))"; public static final String DEFAULT_MB_TOPIC_SINK_TEMPLATE = "@sink(type='jms'," + "factory.initial='org.wso2.andes.jndi.PropertiesFileInitialContextFactory'," + "provider.url='../../resources/jndi.properties',connection.factory.type='topic'," - + "destination = '${" + MB_DESTINATION + "}', connection.factory.jndi.name='TopicConnectionFactory'," - + "@map(type='" + MAPPING + "'))"; + + "destination = '${" + MB_DESTINATION + "}', connection.factory.jndi.name=" + + "'TopicConnectionFactory',@map(type='" + MAPPING + "'))"; public static final String DEFAULT_MB_QUEUE_SINK_TEMPLATE = "@sink(type='jms'," + "factory.initial='org.wso2.andes.jndi.PropertiesFileInitialContextFactory'," + "provider.url='../../resources/jndi.properties',connection.factory.type='queue'," - + "destination = '${" + MB_DESTINATION + "}', connection.factory.jndi.name='QueueConnectionFactory'," - + "@map(type='" + MAPPING + "'))"; + + "destination = '${" + MB_DESTINATION + "}', connection.factory.jndi.name=" + + "'QueueConnectionFactory',@map(type='" + MAPPING + "'))"; public static final String PARTITIONED_MB_SINK_TEMPLATE = "@sink(type='jms'," + "factory.initial='org.wso2.andes.jndi.PropertiesFileInitialContextFactory'," @@ -101,7 +106,8 @@ public class ResourceManagerConstants { + "@map(type='" + MAPPING + "'))"; - public static final String DESTINATION = "@destination(partition.no = '${" + PARTITION_NO + "}')"; + public static final String DESTINATION = "@destination(partition.no = '${" + PARTITION_NO + + "}')"; public static final String KEY_NODE_ID = "managerNodeId"; @@ -119,7 +125,8 @@ public class ResourceManagerConstants { public static final String KEY_NODE_PASSWORD = "httpsInterfacePassword"; - public static final String TASK_UPSERT_RESOURCE_MAPPING = "Inserting/Updating resource mapping group"; + public static final String TASK_UPSERT_RESOURCE_MAPPING = "Inserting/Updating resource mapping " + + "group"; public static final String TASK_GET_RESOURCE_MAPPING = "Getting resource mapping group"; @@ -135,6 +142,4 @@ public class ResourceManagerConstants { public static final String PS_SELECT_RESOURCE_MAPPING_ROW = "SELECT GROUP_ID, RESOURCE_MAPPING FROM RESOURCE_POOL_TABLE WHERE GROUP_ID =?"; - - } diff --git a/components/org.wso2.carbon.sp.jobmanager.core/src/test/java/org/wso2/carbon/sp/jobmanager/core/JmsTransportTestCase.java b/components/org.wso2.carbon.sp.jobmanager.core/src/test/java/org/wso2/carbon/sp/jobmanager/core/JmsTransportTestCase.java index d6048f1528e..011e3d8d607 100644 --- a/components/org.wso2.carbon.sp.jobmanager.core/src/test/java/org/wso2/carbon/sp/jobmanager/core/JmsTransportTestCase.java +++ b/components/org.wso2.carbon.sp.jobmanager.core/src/test/java/org/wso2/carbon/sp/jobmanager/core/JmsTransportTestCase.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except @@ -37,7 +37,6 @@ import org.wso2.siddhi.core.util.transport.InMemoryBroker; import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -54,7 +53,6 @@ public class JmsTransportTestCase { private JmsQueuePublisherTestUtil jmsQueuePublisher; private JmsTopicPublisherTestUtil jmsTopicPublisher; - @BeforeMethod public void setUp() { jmsQueuePublisher = new JmsQueuePublisherTestUtil(); @@ -71,11 +69,12 @@ public void testSiddhiTopologyCreator() { String siddhiApp = "@App:name('Energy-Alert-App')\n" + "@App:description('Energy consumption and anomaly detection')\n" + "@source(type = 'http', topic = 'device-power', @map(type = 'json'))\n" - + "define stream DevicePowerStream (type string, deviceID string, power int, roomID string);\n" + + "define stream DevicePowerStream (type string, deviceID string, power int," + + " roomID string);\n" + "@sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john'," + " address = 'john@gmail.com'," - + " password ='test', subject = 'High power consumption of {{deviceID}}', @map(type = 'xml'," - + " @payload('Device ID: {{deviceID}} of" + + " password ='test', subject = 'High power consumption of {{deviceID}}', " + + "@map(type = 'xml', @payload('Device ID: {{deviceID}} of" + "room : {{roomID}} power is consuming {{finalPower}}kW/h. ')))\n" + "define stream AlertStream (deviceID string, roomID string, initialPower double, " + "finalPower double,autorityContactEmail string);\n" @@ -91,16 +90,16 @@ public void testSiddhiTopologyCreator() { + "select deviceID, avg(power) as avgPower, roomID\n" + "insert current events into #AvgPowerStream;\n" + "@info(name = 'power-increase-detector')\n" - + "from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower]" - + " within 10 min\n" - + "select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower, " - + "e1.roomID\n" + + "from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) " + + "<= avgPower] within 10 min\n" + + "select e1.deviceID as deviceID, e1.avgPower as initialPower, " + + "e2.avgPower as finalPower, e1.roomID\n" + "insert current events into RisingPowerStream;\n" + "end;\n" + "@info(name = 'power-range-filter')@dist(parallel='2', execGroup='003')\n" + "from RisingPowerStream[finalPower > 100]\n" - + "select deviceID, roomID, initialPower, finalPower, 'no-reply@powermanagement.com' " - + "as autorityContactEmail\n" + + "select deviceID, roomID, initialPower, finalPower, " + + "'no-reply@powermanagement.com' as autorityContactEmail\n" + "insert current events into AlertStream;\n" + "@info(name = 'internal-filter')@dist(execGroup='004')\n" + "from DevicePowerStream[type == 'internal']\n" @@ -120,8 +119,8 @@ public void testSiddhiTopologyCreator() { } /** - * Filter query can reside in an execGroup with parallel > 1 and the corresponding stream will have - * {@link TransportStrategy#ROUND_ROBIN}. + * Filter query can reside in an execGroup with parallel > 1 and the corresponding stream + * will have {@link TransportStrategy#ROUND_ROBIN}. */ @Test(dependsOnMethods = "testSiddhiTopologyCreator") @@ -140,16 +139,18 @@ public void testFilterQuery() { SiddhiTopologyCreatorImpl siddhiTopologyCreator = new SiddhiTopologyCreatorImpl(); SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("TempInternalStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.ROUND_ROBIN); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("TempInternalStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.ROUND_ROBIN); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); InputHandler tempStreamHandler = - siddhiAppRuntimeMap.get("TestPlan2-001").get(0).getInputHandler("TempStream"); + siddhiAppRuntimeMap.get("TestPlan2-001").get(0) + .getInputHandler("TempStream"); for (SiddhiAppRuntime runtime : siddhiAppRuntimeMap.get("TestPlan2-002")) { runtime.addCallback("HighTempStream", new StreamCallback() { @Override public void receive(Event[] events) { @@ -173,8 +174,8 @@ public void testFilterQuery() { } /** - *Window can can reside in an execGroup with parallel > 1 if the used stream is a (Partitioned/Inner) Stream. - * + *Window can can reside in an execGroup with parallel > 1 if the used stream is a + * (Partitioned/Inner) Stream. */ @Test(dependsOnMethods = "testFilterQuery") public void testPartitionWithWindow() { @@ -194,16 +195,17 @@ public void testPartitionWithWindow() { SiddhiTopologyCreatorImpl siddhiTopologyCreator = new SiddhiTopologyCreatorImpl(); SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("TempInternalStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.FIELD_GROUPING); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("TempInternalStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.FIELD_GROUPING); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); - InputHandler tempStreamHandler = - siddhiAppRuntimeMap.get("TestPlan3-group1").get(0).getInputHandler("TempStream"); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); + InputHandler tempStreamHandler = siddhiAppRuntimeMap.get("TestPlan3-group1").get(0) + .getInputHandler("TempStream"); for (SiddhiAppRuntime runtime : siddhiAppRuntimeMap.get("TestPlan3-group2")) { runtime.addCallback("DeviceTempStream", new StreamCallback() { @Override public void receive(Event[] events) { @@ -230,8 +232,8 @@ public void testPartitionWithWindow() { SiddhiTestHelper.waitForEvents(2000, 2, count, 3000); Assert.assertEquals(count.intValue(), 2); - Assert.assertEquals(errorAssertionCount.intValue(), 0, "No assertion errors should " - + "occur inside callbacks"); + Assert.assertEquals(errorAssertionCount.intValue(), 0, + "No assertion errors should occur inside callbacks"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } finally { @@ -240,7 +242,8 @@ public void testPartitionWithWindow() { } /** - * Sequence can can reside in an execGroup with parallel > 1 if the used stream is a (Partitioned/Inner) Stream. + * Sequence can can reside in an execGroup with parallel > 1 if the used stream is a + * (Partitioned/Inner) Stream. */ @Test(dependsOnMethods = "testPartitionWithWindow") public void testPartitionWithSequence() { @@ -266,10 +269,10 @@ public void testPartitionWithSequence() { List queryGroupList = appCreator.createApps(topology); SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); - InputHandler tempStreamHandler = - siddhiAppRuntimeMap.get("TestPlan4-group1").get(0).getInputHandler("TempStream"); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); + InputHandler tempStreamHandler = siddhiAppRuntimeMap.get("TestPlan4-group1").get(0) + .getInputHandler("TempStream"); for (SiddhiAppRuntime runtime : siddhiAppRuntimeMap.get("TestPlan4-group2")) { runtime.addCallback("PeakTempStream", new StreamCallback() { @Override public void receive(Event[] events) { @@ -301,8 +304,8 @@ public void testPartitionWithSequence() { SiddhiTestHelper.waitForEvents(2500, 2, count, 3000); Assert.assertEquals(count.intValue(), 2); - Assert.assertEquals(errorAssertionCount.intValue(), 0, "No assertion errors should occur " + - "inside callbacks"); + Assert.assertEquals(errorAssertionCount.intValue(), 0, + "No assertion errors should occur inside callbacks"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } finally { @@ -311,8 +314,8 @@ public void testPartitionWithSequence() { } /** - * If a siddhi app contains patterns while the corresponding execution group's parallelism > 1 then - * SiddhiAppValidationException will be thrown + * If a siddhi app contains patterns while the corresponding execution group's + * parallelism > 1 then SiddhiAppValidationException will be thrown */ @Test(dependsOnMethods = "testPartitionWithSequence") public void testPartitionWithPattern() { @@ -340,9 +343,10 @@ public void testPartitionWithPattern() { } /** - * A join can exist with Parallel >1 if the joined stream consists at least one Partitioned Stream. - * The partitioned streams in the join will subscribe with {@link TransportStrategy#FIELD_GROUPING} - * The unpartitioned streams in the join will subscribe with {@link TransportStrategy#ALL} + * A join can exist with Parallel >1 if the joined stream consists at least one + * Partitioned Stream. The partitioned streams in the join will subscribe with + * {@link TransportStrategy#FIELD_GROUPING} The unpartitioned streams in the join will + * subscribe with {@link TransportStrategy#ALL} */ @Test(dependsOnMethods = "testPartitionWithPattern") public void testJoinWithPartition() { @@ -368,21 +372,25 @@ public void testJoinWithPartition() { Assert.assertEquals(topology.getQueryGroupList().get(0).getInputStreams().get("TempStream") .getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("TempInternalStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.FIELD_GROUPING); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("RegulatorStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("TempInternalStream") - .getSubscriptionStrategy().getOfferedParallelism(), 2); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("TempInternalStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.FIELD_GROUPING); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("RegulatorStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.ALL); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("TempInternalStream").getSubscriptionStrategy() + .getOfferedParallelism(), 2); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); InputHandler tempStreamHandler = - siddhiAppRuntimeMap.get("TestPlan6-group1").get(0).getInputHandler("TempStream"); + siddhiAppRuntimeMap.get("TestPlan6-group1").get(0) + .getInputHandler("TempStream"); for (SiddhiAppRuntime runtime : siddhiAppRuntimeMap.get("TestPlan6-group2")) { runtime.addCallback("RegulatorActionStream", new StreamCallback() { @Override public void receive(Event[] events) { @@ -407,11 +415,11 @@ public void testJoinWithPartition() { tempStreamHandler.send(new Object[]{2, 101, 45}); Thread.sleep(1000); try { - jmsTopicPublisher.publishMessage("TestPlan6_RegulatorStream", "" - + "1100" + jmsTopicPublisher.publishMessage("TestPlan6_RegulatorStream", + "1100" + "false)"); - jmsTopicPublisher.publishMessage("TestPlan6_RegulatorStream", "" - + "2101" + jmsTopicPublisher.publishMessage("TestPlan6_RegulatorStream", + "2101" + "false"); } catch (Exception e) { System.out.println(e.getMessage()); @@ -419,20 +427,18 @@ public void testJoinWithPartition() { SiddhiTestHelper.waitForEvents(2000, 2, count, 3000); Assert.assertEquals(count.intValue(), 2); - Assert.assertEquals(errorAssertionCount.intValue(), 0, "No assertion errors should" - + " occur inside callbacks"); + Assert.assertEquals(errorAssertionCount.intValue(), 0, + "No assertion errors should occur inside callbacks"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } finally { siddhiManager.shutdown(); } - - } /** - * A partitioned stream used outside the Partition but inside the same execGroup will have the Subscription - * strategy of {@link TransportStrategy#FIELD_GROUPING} + * A partitioned stream used outside the Partition but inside the same execGroup will + * have the Subscription strategy of {@link TransportStrategy#FIELD_GROUPING} */ @Test(dependsOnMethods = "testJoinWithPartition") public void testPartitionStrategy() { @@ -458,27 +464,30 @@ public void testPartitionStrategy() { + "insert into RegulatorActionStream;" + "end;"; - SiddhiTopologyCreatorImpl siddhiTopologyCreator = new SiddhiTopologyCreatorImpl(); SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); Assert.assertEquals(topology.getQueryGroupList().get(0).getInputStreams().get("TempStream") .getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("TempInternalStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.FIELD_GROUPING); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("RegulatorStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("TempInternalStream") - .getSubscriptionStrategy().getOfferedParallelism(), 2); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("TempInternalStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.FIELD_GROUPING); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("RegulatorStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.ALL); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("TempInternalStream").getSubscriptionStrategy() + .getOfferedParallelism(), 2); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); InputHandler tempStreamHandler = - siddhiAppRuntimeMap.get("TestPlan7-group1").get(0).getInputHandler("TempStream"); + siddhiAppRuntimeMap.get("TestPlan7-group1").get(0) + .getInputHandler("TempStream"); for (SiddhiAppRuntime runtime : siddhiAppRuntimeMap.get("TestPlan7-group2")) { runtime.addCallback("RegulatorActionStream", new StreamCallback() { @Override public void receive(Event[] events) { @@ -521,11 +530,11 @@ public void testPartitionStrategy() { tempStreamHandler.send(new Object[]{2, 101, 45}); Thread.sleep(1000); try { - jmsTopicPublisher.publishMessage("TestPlan7_RegulatorStream", "" - + "1100" + jmsTopicPublisher.publishMessage("TestPlan7_RegulatorStream", + "1100" + "false"); - jmsTopicPublisher.publishMessage("TestPlan7_RegulatorStream", "" - + "2101" + jmsTopicPublisher.publishMessage("TestPlan7_RegulatorStream", + "2101" + "false"); } catch (Exception e) { System.out.println(e.getMessage()); @@ -533,8 +542,8 @@ public void testPartitionStrategy() { SiddhiTestHelper.waitForEvents(2000, 2, count, 3000); Assert.assertEquals(count.intValue(), 2); - Assert.assertEquals(errorAssertionCount.intValue(), 0, "No assertion errors should " - + "occur inside callbacks"); + Assert.assertEquals(errorAssertionCount.intValue(), 0, + "No assertion errors should occur inside callbacks"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } finally { @@ -543,14 +552,16 @@ public void testPartitionStrategy() { } /** - * A stream used by multiple partitions residing in different executionGroups and under same Partition key gets - * assigned with the respective parallelism as as distinct publishing strategies. + * A stream used by multiple partitions residing in different executionGroups and under + * same Partition key gets assigned with the respective parallelism as as distinct + * publishing strategies. */ @Test(dependsOnMethods = "testPartitionStrategy") public void testPartitionMultiSubscription() { AtomicInteger dumbStreamCount = new AtomicInteger(0); String siddhiApp = "@App:name('TestPlan8') \n" - + "Define stream stockStream(symbol string, price float, quantity int, tier string);\n" + + "Define stream stockStream(symbol string, price float, quantity int," + + " tier string);\n" + "Define stream companyTriggerStream(symbol string);\n" + "@info(name = 'query1')@dist(parallel='1', execGroup='000')\n" + "From stockStream[price > 100]\n" @@ -567,9 +578,9 @@ public void testPartitionMultiSubscription() { + "From filteredStockStream#window.lengthBatch(2)\n" + "Select symbol, avg(price) as avgPrice, quantity\n" + "Insert into #avgPriceStream;\n" - + "From #avgPriceStream#window.time(5 min) as a right outer join companyTriggerInternalStream#window" - + ".length" - + "(1)\n" + + "From #avgPriceStream#window.time(5 min) as a right outer join " + + "companyTriggerInternalStream#window" + + ".length(1)\n" + "On (companyTriggerInternalStream.symbol == a.symbol)\n" + "Select a.symbol, a.avgPrice, a.quantity\n" + "Insert into triggeredAvgStream;\n" @@ -585,27 +596,33 @@ public void testPartitionMultiSubscription() { SiddhiTopologyCreatorImpl siddhiTopologyCreator = new SiddhiTopologyCreatorImpl(); SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); - Assert.assertEquals(topology.getQueryGroupList().get(0).getInputStreams().get("stockStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("filteredStockStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.FIELD_GROUPING); - Assert.assertEquals(topology.getQueryGroupList().get(2).getInputStreams().get("filteredStockStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.FIELD_GROUPING); - Assert.assertEquals(topology.getQueryGroupList().get(0).getOutputStreams().get("filteredStockStream") - .getPublishingStrategyList().get(0).getParallelism(), 3); - Assert.assertEquals(topology.getQueryGroupList().get(0).getOutputStreams().get("filteredStockStream") - .getPublishingStrategyList().get(0).getGroupingField(), "symbol"); - - - - Assert.assertEquals(topology.getQueryGroupList().get(0).getSiddhiApp(), "@App:name('${appName}') \n" + Assert.assertEquals(topology.getQueryGroupList().get(0).getInputStreams() + .get("stockStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.ALL); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("filteredStockStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.FIELD_GROUPING); + Assert.assertEquals(topology.getQueryGroupList().get(2).getInputStreams() + .get("filteredStockStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.FIELD_GROUPING); + Assert.assertEquals(topology.getQueryGroupList().get(0).getOutputStreams() + .get("filteredStockStream").getPublishingStrategyList().get(0) + .getParallelism(), 3); + Assert.assertEquals(topology.getQueryGroupList().get(0).getOutputStreams() + .get("filteredStockStream").getPublishingStrategyList().get(0) + .getGroupingField(), "symbol"); + + Assert.assertEquals(topology.getQueryGroupList().get(0).getSiddhiApp(), + "@App:name('${appName}') \n" + "${companyTriggerStream}" + "Define stream companyTriggerStream(symbol string);\n" + "${stockStream}" - + "Define stream stockStream(symbol string, price float, quantity int, tier string);\n" - + "${filteredStockStream}define stream filteredStockStream (symbol string, price float, quantity int," + + "Define stream stockStream(symbol string, price float, quantity int," + " tier string);\n" - + "${companyTriggerInternalStream}define stream companyTriggerInternalStream (symbol string);\n" + + "${filteredStockStream}define stream filteredStockStream (symbol string, " + + "price float, quantity int, tier string);\n" + + "${companyTriggerInternalStream}define stream " + + "companyTriggerInternalStream (symbol string);\n" + "@info(name = 'query1')\n" + "From stockStream[price > 100]\n" + "Select *\n" @@ -616,20 +633,22 @@ public void testPartitionMultiSubscription() { + "insert into\n" + "companyTriggerInternalStream;\n"); - Assert.assertEquals(topology.getQueryGroupList().get(1).getSiddhiApp(), "@App:name('${appName}') \n" - + "${companyTriggerInternalStream}define stream companyTriggerInternalStream (symbol string);\n" - + "${filteredStockStream}define stream filteredStockStream (symbol string, price float, quantity int," - + " tier string);\n" - + "${triggeredAvgStream}define stream triggeredAvgStream (symbol string, avgPrice double, quantity " - + "int);\n" + Assert.assertEquals(topology.getQueryGroupList().get(1).getSiddhiApp(), + "@App:name('${appName}') \n" + + "${companyTriggerInternalStream}define stream " + + "companyTriggerInternalStream (symbol string);\n" + + "${filteredStockStream}define stream filteredStockStream (symbol string," + + " price float, quantity int, tier string);\n" + + "${triggeredAvgStream}define stream triggeredAvgStream (symbol string, " + + "avgPrice double, quantity int);\n" + "@info(name='query3')\n" + "Partition with (symbol of filteredStockStream)\n" + "begin\n" + "From filteredStockStream#window.lengthBatch(2)\n" + "Select symbol, avg(price) as avgPrice, quantity\n" + "Insert into #avgPriceStream;\n" - + "From #avgPriceStream#window.time(5 min) as a right outer join companyTriggerInternalStream#window" - + ".length(1)\n" + + "From #avgPriceStream#window.time(5 min) as a right outer join " + + "companyTriggerInternalStream#window.length(1)\n" + "On (companyTriggerInternalStream.symbol == a.symbol)\n" + "Select a.symbol, a.avgPrice, a.quantity\n" + "Insert into triggeredAvgStream;\n" @@ -640,12 +659,12 @@ public void testPartitionMultiSubscription() { SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); - InputHandler stockStreamHandler = - siddhiAppRuntimeMap.get("TestPlan8-000").get(0).getInputHandler("stockStream"); - InputHandler triggerStreamHandler = - siddhiAppRuntimeMap.get("TestPlan8-000").get(0).getInputHandler("companyTriggerStream"); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); + InputHandler stockStreamHandler = siddhiAppRuntimeMap.get("TestPlan8-000").get(0) + .getInputHandler("stockStream"); + InputHandler triggerStreamHandler = siddhiAppRuntimeMap.get("TestPlan8-000").get(0) + .getInputHandler("companyTriggerStream"); for (SiddhiAppRuntime runtime : siddhiAppRuntimeMap.get("TestPlan8-001")) { runtime.addCallback("triggeredAvgStream", new StreamCallback() { @Override public void receive(Event[] events) { @@ -681,11 +700,12 @@ public void testPartitionMultiSubscription() { triggerStreamHandler.send(new Object[]{"WSO2"}); SiddhiTestHelper.waitForEvents(1000, 1, count, 2000); - SiddhiTestHelper.waitForEvents(1000, 2, dumbStreamCount, 2000); + SiddhiTestHelper.waitForEvents(1000, 2, dumbStreamCount, + 2000); Assert.assertEquals(count.intValue(), 1); Assert.assertEquals(dumbStreamCount.intValue(), 2); - Assert.assertEquals(errorAssertionCount.intValue(), 0, "No assertion errors should " - + "occur inside callbacks"); + Assert.assertEquals(errorAssertionCount.intValue(), 0, + "No assertion errors should occur inside callbacks"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } finally { @@ -694,14 +714,16 @@ public void testPartitionMultiSubscription() { } /** - * A stream used by multiple partitions residing in different executionGroups and different Partition key gets - * assigned with the {@link TransportStrategy#FIELD_GROUPING} and corresponding parallelism. + * A stream used by multiple partitions residing in different executionGroups and + * different Partition key gets assigned with the {@link TransportStrategy#FIELD_GROUPING} + * and corresponding parallelism. */ @Test(dependsOnMethods = "testPartitionMultiSubscription") public void testPartitionWithMultiKey() { AtomicInteger dumbStreamCount = new AtomicInteger(0); String siddhiApp = "@App:name('TestPlan9') \n" - + "Define stream stockStream(symbol string, price float, quantity int, tier string);\n" + + "Define stream stockStream(symbol string, price float, quantity int," + + " tier string);\n" + "Define stream companyTriggerStream(symbol string);\n" + "@info(name = 'query1')@dist(parallel='1', execGroup='001')\n" + "From stockStream[price > 100]\n" @@ -731,28 +753,32 @@ public void testPartitionWithMultiKey() { + "Select tier, count() as eventCount\n" + "Insert into dumbStream;\n" + "End;\n"; + SiddhiTopologyCreatorImpl siddhiTopologyCreator = new SiddhiTopologyCreatorImpl(); SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); - Assert.assertEquals(topology.getQueryGroupList().get(0).getInputStreams().get("stockStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("filteredStockStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.FIELD_GROUPING); - Assert.assertEquals(topology.getQueryGroupList().get(0).getOutputStreams().get("filteredStockStream") - .getPublishingStrategyList().get(0).getGroupingField(), "symbol"); - Assert.assertEquals(topology.getQueryGroupList().get(0).getOutputStreams().get("filteredStockStream") - .getPublishingStrategyList().get(1).getGroupingField(), "tier"); + Assert.assertEquals(topology.getQueryGroupList().get(0).getInputStreams() + .get("stockStream").getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("filteredStockStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.FIELD_GROUPING); + Assert.assertEquals(topology.getQueryGroupList().get(0).getOutputStreams() + .get("filteredStockStream").getPublishingStrategyList().get(0) + .getGroupingField(), "symbol"); + Assert.assertEquals(topology.getQueryGroupList().get(0).getOutputStreams() + .get("filteredStockStream").getPublishingStrategyList().get(1) + .getGroupingField(), "tier"); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); - InputHandler stockStreamHandler = - siddhiAppRuntimeMap.get("TestPlan9-001").get(0).getInputHandler("stockStream"); - InputHandler triggerStreamHandler = - siddhiAppRuntimeMap.get("TestPlan9-001").get(0).getInputHandler("companyTriggerStream"); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); + InputHandler stockStreamHandler = siddhiAppRuntimeMap.get("TestPlan9-001").get(0) + .getInputHandler("stockStream"); + InputHandler triggerStreamHandler = siddhiAppRuntimeMap.get("TestPlan9-001").get(0) + .getInputHandler("companyTriggerStream"); for (SiddhiAppRuntime runtime : siddhiAppRuntimeMap.get("TestPlan9-002")) { runtime.addCallback("triggeredAvgStream", new StreamCallback() { @Override public void receive(Event[] events) { @@ -796,11 +822,12 @@ public void testPartitionWithMultiKey() { triggerStreamHandler.send(new Object[]{"WSO2"}); SiddhiTestHelper.waitForEvents(2000, 1, count, 3000); - SiddhiTestHelper.waitForEvents(2000, 2, dumbStreamCount, 3000); + SiddhiTestHelper.waitForEvents(2000, 2, dumbStreamCount, + 3000); Assert.assertEquals(count.intValue(), 1); Assert.assertEquals(dumbStreamCount.intValue(), 2); - Assert.assertEquals(errorAssertionCount.intValue(), 0, "No assertion errors should " - + "occur inside callbacks"); + Assert.assertEquals(errorAssertionCount.intValue(), 0, + "No assertion errors should occur inside callbacks"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } finally { @@ -809,15 +836,17 @@ public void testPartitionWithMultiKey() { } /** - * user given Sink used in (parallel/multiple execGroups) will get assigned to a all the execGroups after Topology - * creation + * user given Sink used in (parallel/multiple execGroups) will get assigned to a all the + * execGroups after Topology creation */ @Test(dependsOnMethods = "testPartitionWithMultiKey") public void testUserDefinedSink() { String siddhiApp = "@App:name('TestPlan10') \n" - + "Define stream stockStream(symbol string, price float, quantity int, tier string);\n" + + "Define stream stockStream(symbol string, price float, quantity int, " + + "tier string);\n" + "@Sink(type='inMemory', topic='takingOverTopic', @map(type='passThrough'))\n" - + "Define stream takingOverStream(symbol string, overtakingSymbol string, avgPrice double);\n" + + "Define stream takingOverStream(symbol string, overtakingSymbol string, " + + "avgPrice double);\n" + "@info(name = 'query1')@dist(parallel='1', execGroup='001')\n" + "From stockStream[price > 100]\n" + "Select *\n" @@ -837,8 +866,10 @@ public void testUserDefinedSink() { SiddhiTopologyCreatorImpl siddhiTopologyCreator = new SiddhiTopologyCreatorImpl(); SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); - Assert.assertTrue(topology.getQueryGroupList().get(1).getOutputStreams().containsKey("takingOverStream")); - Assert.assertTrue(topology.getQueryGroupList().get(2).getOutputStreams().containsKey("takingOverStream")); + Assert.assertTrue(topology.getQueryGroupList().get(1).getOutputStreams() + .containsKey("takingOverStream")); + Assert.assertTrue(topology.getQueryGroupList().get(2).getOutputStreams() + .containsKey("takingOverStream")); InMemoryBroker.Subscriber subscriptionTakingOver = new InMemoryBroker.Subscriber() { @Override @@ -866,15 +897,16 @@ public String getTopic() { return "takingOverTopic"; } }; + InMemoryBroker.subscribe(subscriptionTakingOver); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); - InputHandler stockStreamHandler = - siddhiAppRuntimeMap.get("TestPlan10-001").get(0).getInputHandler("stockStream"); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); + InputHandler stockStreamHandler = siddhiAppRuntimeMap.get("TestPlan10-001").get(0) + .getInputHandler("stockStream"); stockStreamHandler.send(new Object[]{"WSO2", 150F, 2, "middleware"}); stockStreamHandler.send(new Object[]{"WSO2", 200F, 2, "middleware"}); @@ -883,8 +915,8 @@ public String getTopic() { SiddhiTestHelper.waitForEvents(1500, 3, count, 2000); Assert.assertEquals(count.intValue(), 3); - Assert.assertEquals(errorAssertionCount.intValue(), 0, "No assertion errors should " - + "occur inside callbacks"); + Assert.assertEquals(errorAssertionCount.intValue(), 0, + "No assertion errors should occur inside callbacks"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } finally { @@ -894,20 +926,22 @@ public String getTopic() { } /** - * when a user defined sink stream is used as in an internal source stream, a placeholder corresponding to the - * streamID will be added to the respective sink so that the placeholder will bridge the stream to the required - * source. + * when a user defined sink stream is used as in an internal source stream, a placeholder + * corresponding to the streamID will be added to the respective sink so that the placeholder + * will bridge the stream to the required source. */ @Test(dependsOnMethods = "testUserDefinedSink") public void testSinkStreamForSource() { - String siddhiApp = "@App:name('TestPlan11')\n" - + "Define stream stockStream(symbol string, price float, quantity int, tier string);\n" + + "Define stream stockStream(symbol string, price float, quantity int," + + " tier string);\n" + "Define stream companyTriggerStream(symbol string);\n" + "@Sink(type='inMemory', topic='takingOverTopic', @map(type='passThrough'))\n" - + "Define stream takingOverStream(symbol string, overtakingSymbol string, avgPrice double);\n" + + "Define stream takingOverStream(symbol string, overtakingSymbol string, " + + "avgPrice double);\n" + "@Sink(type='inMemory', topic='takingOverTableTopic', @map(type='passThrough'))\n" - + "Define stream takingOverTableStream(symbol string, overtakingSymbol string, avgPrice double);\n" + + "Define stream takingOverTableStream(symbol string, overtakingSymbol string, " + + "avgPrice double);\n" + "@info(name = 'query1')@dist(parallel='1', execGroup='001')\n" + "From stockStream[price > 100]\n" + "Select * Insert into filteredStockStream;\n" @@ -929,15 +963,19 @@ public void testSinkStreamForSource() { Assert.assertEquals(topology.getQueryGroupList().get(0).getInputStreams().get("stockStream") .getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); - Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams().get("filteredStockStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.FIELD_GROUPING); - Assert.assertEquals(topology.getQueryGroupList().get(2).getInputStreams().get("avgPriceStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); - Assert.assertEquals(topology.getQueryGroupList().get(3).getInputStreams().get("takingOverStream") - .getSubscriptionStrategy().getStrategy(), TransportStrategy.ALL); - - Assert.assertEquals(topology.getQueryGroupList().get(2).getOutputStreams().get("takingOverStream") - .getStreamDefinition() + ";", "@Sink(type='inMemory', topic='takingOverTopic', " + Assert.assertEquals(topology.getQueryGroupList().get(1).getInputStreams() + .get("filteredStockStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.FIELD_GROUPING); + Assert.assertEquals(topology.getQueryGroupList().get(2).getInputStreams() + .get("avgPriceStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.ALL); + Assert.assertEquals(topology.getQueryGroupList().get(3).getInputStreams() + .get("takingOverStream").getSubscriptionStrategy() + .getStrategy(), TransportStrategy.ALL); + + Assert.assertEquals(topology.getQueryGroupList().get(2).getOutputStreams() + .get("takingOverStream").getStreamDefinition() + + ";", "@Sink(type='inMemory', topic='takingOverTopic', " + "@map(type='passThrough'))\n" + "${takingOverStream} \n" + "Define stream takingOverStream(symbol string, overtakingSymbol string, " @@ -966,7 +1004,6 @@ public String getTopic() { } }; - InMemoryBroker.Subscriber subscriptionTakingOverTable = new InMemoryBroker.Subscriber() { @Override public void onMessage(Object msg) { @@ -996,10 +1033,10 @@ public String getTopic() { List queryGroupList = appCreator.createApps(topology); SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); - InputHandler stockStreamHandler = - siddhiAppRuntimeMap.get("TestPlan11-001").get(0).getInputHandler("stockStream"); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); + InputHandler stockStreamHandler = siddhiAppRuntimeMap.get("TestPlan11-001").get(0) + .getInputHandler("stockStream"); stockStreamHandler.send(new Object[]{"WSO2", 150F, 2, "middleware"}); stockStreamHandler.send(new Object[]{"WSO2", 200F, 2, "middleware"}); @@ -1008,8 +1045,9 @@ public String getTopic() { SiddhiTestHelper.waitForEvents(5000, 3, count, 8000); Assert.assertEquals(count.intValue(), 4); - Assert.assertEquals(errorAssertionCount.intValue(), 0, "No assertion errors should " + - "occur inside callbacks"); + Assert.assertEquals(errorAssertionCount.intValue(), 0, + "No assertion errors should " + + "occur inside callbacks"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } finally { @@ -1020,14 +1058,16 @@ public String getTopic() { } /** - * when user given sources are located in more than 1 execGroup then a passthrough query will be added in a new - * execGroup.Newly created execGroup will be moved to as the first element of already created passthrough queries + * when user given sources are located in more than 1 execGroup then a passthrough query + * will be added in a new execGroup.Newly created execGroup will be moved to as the first + * element of already created passthrough queries */ @Test(dependsOnMethods = "testSinkStreamForSource") public void testUsergivenSourceNoGroup() { String siddhiApp = "@App:name('testplan12') \n" + "@source(type='inMemory', topic='stock', @map(type='json'))\n" - + "Define stream stockstream(symbol string, price float, quantity int, tier string);\n" + + "Define stream stockstream(symbol string, price float, quantity int," + + " tier string);\n" + "@source(type='inMemory', topic='companyTrigger', @map(type='json'))\n" + "Define stream companyTriggerStream(symbol string);\n" + "@info(name = 'query1')@dist(parallel='3', execGroup='001')\n" @@ -1048,8 +1088,8 @@ public void testUsergivenSourceNoGroup() { + "From filteredStockStream#window.lengthBatch(2)\n" + "Select symbol, avg(price) as avgPrice, quantity\n" + "Insert into #avgPriceStream;\n" - + "From #avgPriceStream#window.time(5 min) as a right outer join companyTriggerStream#window.length" - + "(1) \n" + + "From #avgPriceStream#window.time(5 min) as a right outer join " + + "companyTriggerStream#window.length(1) \n" + "On (companyTriggerStream.symbol == a.symbol)\n" + "Select a.symbol, a.avgPrice, a.quantity, companyTriggerStream.symbol as sss\n" + "Insert into triggeredAvgStream;\n" @@ -1059,17 +1099,22 @@ public void testUsergivenSourceNoGroup() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); - Assert.assertTrue(queryGroupList.size() == 4, "Four query groups should be created"); - Assert.assertTrue(queryGroupList.get(0).getGroupName().contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), + Assert.assertTrue(queryGroupList.size() == 4, + "Four query groups should be created"); + Assert.assertTrue(queryGroupList.get(0).getGroupName() + .contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), "Two passthrough queries should be present in separate groups"); - Assert.assertTrue(queryGroupList.get(0).isReceiverQueryGroup(), "Receiver type should be set"); - Assert.assertTrue(queryGroupList.get(1).getGroupName().contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), + Assert.assertTrue(queryGroupList.get(0).isReceiverQueryGroup(), + "Receiver type should be set"); + Assert.assertTrue(queryGroupList.get(1).getGroupName() + .contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), "Two passthrough queries should be present in separate groups"); - Assert.assertTrue(queryGroupList.get(1).isReceiverQueryGroup(), "Receiver type should be set"); + Assert.assertTrue(queryGroupList.get(1).isReceiverQueryGroup(), + "Receiver type should be set"); SiddhiManager siddhiManager = new SiddhiManager(); try { - Map> siddhiAppRuntimeMap = createSiddhiAppRuntimes(siddhiManager, - queryGroupList); + Map> siddhiAppRuntimeMap = + createSiddhiAppRuntimes(siddhiManager, queryGroupList); for (SiddhiAppRuntime runtime : siddhiAppRuntimeMap.get("testplan12-001")) { runtime.addCallback("stockstream", new StreamCallback() { @Override @@ -1086,14 +1131,14 @@ public void receive(Event[] events) { } }); } - InMemoryBroker.publish("stock", "{\"event\":{\"symbol\":\"WSO2\", \"price\":225.0, " - + "\"quantity\":20,\"tier\":\"middleware\"}}"); + InMemoryBroker.publish("stock", "{\"event\":{\"symbol\":\"WSO2\"," + + " \"price\":225.0, \"quantity\":20,\"tier\":\"middleware\"}}"); Thread.sleep(5000); SiddhiTestHelper.waitForEvents(8000, 1, count, 10000); Assert.assertEquals(count.intValue(), 1); - Assert.assertEquals(errorAssertionCount.intValue(), 0, "No assertion errors should " - + "occur inside callbacks"); + Assert.assertEquals(errorAssertionCount.intValue(), 0, + "No assertion errors should occur inside callbacks"); } catch (Exception e) { log.error(e.getMessage(), e); } finally { @@ -1105,8 +1150,10 @@ public void receive(Event[] events) { public void testUsergivenParallelSources() { String siddhiApp = "@App:name('TestPlan12') \n" + "@source(type='inMemory', topic='stock', @map(type='json'), @dist(parallel='2')) " - + "@source(type='inMemory', topic='stock123', @map(type='json'), @dist(parallel='3')) " - + "Define stream stockStream(symbol string, price float, quantity int, tier string);\n" + + "@source(type='inMemory', topic='stock123', @map(type='json')," + + " @dist(parallel='3')) " + + "Define stream stockStream(symbol string, price float, quantity int," + + " tier string);\n" + "@source(type='inMemory', topic='companyTrigger', @map(type='json'))" + "Define stream companyTriggerStream(symbol string);\n" + "@info(name = 'query1')@dist(parallel='3', execGroup='001')\n" @@ -1127,8 +1174,8 @@ public void testUsergivenParallelSources() { + "From filteredStockStream#window.lengthBatch(2)\n" + "Select symbol, avg(price) as avgPrice, quantity\n" + "Insert into #avgPriceStream;\n" - + "From #avgPriceStream#window.time(5 min) as a right outer join companyTriggerStream#window.length" - + "(1) \n" + + "From #avgPriceStream#window.time(5 min) as a right outer join " + + "companyTriggerStream#window.length(1) \n" + "On (companyTriggerStream.symbol == a.symbol)\n" + "Select a.symbol, a.avgPrice, a.quantity, companyTriggerStream.symbol as sss\n" + "Insert into triggeredAvgStream;\n" @@ -1138,20 +1185,28 @@ public void testUsergivenParallelSources() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); - Assert.assertTrue(queryGroupList.size() == 5, "Five query groups should be created"); - Assert.assertTrue(queryGroupList.get(0).getGroupName().contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), + Assert.assertTrue(queryGroupList.size() == 5, + "Five query groups should be created"); + Assert.assertTrue(queryGroupList.get(0).getGroupName() + .contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), "Two passthrough queries should be present in separate groups"); - Assert.assertTrue(queryGroupList.get(0).isReceiverQueryGroup(), "Receiver type should be set"); - Assert.assertTrue(queryGroupList.get(1).getGroupName().contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), + Assert.assertTrue(queryGroupList.get(0).isReceiverQueryGroup(), + "Receiver type should be set"); + Assert.assertTrue(queryGroupList.get(1).getGroupName() + .contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), "Two passthrough queries should be present in separate groups"); - Assert.assertTrue(queryGroupList.get(1).isReceiverQueryGroup(), "Receiver type should be set"); - Assert.assertTrue(queryGroupList.get(2).getGroupName().contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), + Assert.assertTrue(queryGroupList.get(1).isReceiverQueryGroup(), + "Receiver type should be set"); + Assert.assertTrue(queryGroupList.get(2).getGroupName() + .contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), "Two passthrough queries should be present in separate groups"); - Assert.assertTrue(queryGroupList.get(2).isReceiverQueryGroup(), "Receiver type should be set"); + Assert.assertTrue(queryGroupList.get(2).isReceiverQueryGroup(), + "Receiver type should be set"); } /** - * Test pass-through creation when multiple subscriptions R/R and Field grouping exist for user given stream. + * Test pass-through creation when multiple subscriptions R/R and Field grouping exist for user + * given stream. */ @Test(dependsOnMethods = "testUsergivenParallelSources") public void testPassthoughWithMultipleSubscription() { @@ -1177,23 +1232,34 @@ public void testPassthoughWithMultipleSubscription() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); - Assert.assertTrue(queryGroupList.size() == 3, "Three query groups should be created"); - Assert.assertTrue(queryGroupList.get(0).getGroupName().contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), + Assert.assertTrue(queryGroupList.size() == 3, + "Three query groups should be created"); + Assert.assertTrue(queryGroupList.get(0).getGroupName() + .contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), "Passthrough query should be present in a separate group"); - Assert.assertTrue(queryGroupList.get(0).isReceiverQueryGroup(), "Receiver type should be set"); + Assert.assertTrue(queryGroupList.get(0).isReceiverQueryGroup(), + "Receiver type should be set"); Assert.assertTrue(queryGroupList.get(0).getSiddhiQueries().get(0).getApp() - .contains("@source(type = 'http', receiver.url='http://localhost:8080/SweetProductionEP'," + .contains("@source(type = 'http', receiver.url='http://localhost:8080" + + "/SweetProductionEP'," + " @map(type = 'json'))\n" - + "define stream passthroughTest1Stream (name string, amount double);\n" + + "define stream passthroughTest1Stream (name string, amount " + + "double);\n" + "@sink(type='jms',factory.initial='org.wso2.andes.jndi" - + ".PropertiesFileInitialContextFactory',provider.url='../../resources/jndi.properties'" - + ",connection.factory.type='topic',destination = 'TestPlan13_Test1Stream'," - + " connection.factory.jndi.name='TopicConnectionFactory',@map(type='xml'))\n" + + ".PropertiesFileInitialContextFactory'," + + "provider.url='../../resources/jndi.properties'" + + ",connection.factory.type='topic',destination = " + + "'TestPlan13_Test1Stream'," + + " connection.factory.jndi.name='TopicConnectionFactory'," + + "@map(type='xml'))\n" + "@sink(type='jms'," - + "factory.initial='org.wso2.andes.jndi.PropertiesFileInitialContextFactory'," - + "provider.url='../../resources/jndi.properties',@distribution(strategy='partitioned'," - + " partitionKey='name',@destination(destination = 'TestPlan13_Test1Stream_name_0')" + + "factory.initial='org.wso2.andes.jndi" + + ".PropertiesFileInitialContextFactory'," + + "provider.url='../../resources/jndi.properties'," + + "@distribution(strategy='partitioned'," + + " partitionKey='name',@destination(destination = " + + "'TestPlan13_Test1Stream_name_0')" + ",@destination(destination = 'TestPlan13_Test1Stream_name_1')," + "@destination(destination = 'TestPlan13_Test1Stream_name_2'))," + "connection.factory.type='topic',connection.factory" @@ -1207,22 +1273,28 @@ public void testPassthoughWithMultipleSubscription() { .contains("@App:name('TestPlan13-001-1') \n" + "@source(type='jms',factory.initial='org.wso2.andes" + ".jndi.PropertiesFileInitialContextFactory'" - + ",provider.url='../../resources/jndi.properties',connection.factory.type='topic'," + + ",provider.url='../../resources/jndi.properties'," + + "connection.factory.type='topic'," + "destination ='TestPlan13_Test1Stream' ," - + " connection.factory.jndi.name='TopicConnectionFactory',@map(type='xml')) \n" + + " connection.factory.jndi.name='TopicConnectionFactory'," + + "@map(type='xml')) \n" + "define stream Test1Stream (name string, amount double);\n" + "define stream Test3Stream (name string, amount double);\n" + "@info(name = 'query2')\n" + " from Test1Stream\n" + "select *\n" - + "insert into Test3Stream;"), "Incorrect partial Siddhi application Created"); + + "insert into Test3Stream;"), + "Incorrect partial Siddhi application Created"); Assert.assertTrue(queryGroupList.get(2).getSiddhiQueries().get(0).getApp() - .contains("@App:name('TestPlan13-002-1') \n" + "@source(type='jms',factory.initial" + .contains("@App:name('TestPlan13-002-1') \n" + "@source(type='jms'" + + ",factory.initial" + "='org.wso2.andes.jndi.PropertiesFileInitialContextFactory'" - + ",provider.url='../../resources/jndi.properties',connection.factory.type='topic'," + + ",provider.url='../../resources/jndi.properties'," + + "connection.factory.type='topic'," + "destination ='TestPlan13_Test1Stream_name_0' ," - + " connection.factory.jndi.name='TopicConnectionFactory',@map(type='xml')) \n" + + " connection.factory.jndi.name='TopicConnectionFactory'," + + "@map(type='xml')) \n" + "define stream Test1Stream (name string, amount double);\n" + "@sink(type='log')\n" + "define stream Test2Stream (name string, amount double);\n" @@ -1233,12 +1305,13 @@ public void testPassthoughWithMultipleSubscription() { + "select name,amount\n" + "insert into Test2Stream;\n" + "end;"), "Incorrect partial Siddhi application Created"); - } + + } /** - * when user given sources are located in one execGroup which has a parallelilsm > 1 then a passthrough query will - * be added in a new execGroup.Newly created execGroup will be moved to as the first element of already created - * passthrough queries + * when user given sources are located in one execGroup which has a parallelilsm > 1 then a + * passthrough query will be added in a new execGroup.Newly created execGroup will be moved + * to as the first element of already created passthrough queries */ @Test(dependsOnMethods = "testPassthoughWithMultipleSubscription") public void testUsergivenSourceSingleGroup() { @@ -1261,32 +1334,41 @@ public void testUsergivenSourceSingleGroup() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); SiddhiAppCreator appCreator = new SPMBSiddhiAppCreator(); List queryGroupList = appCreator.createApps(topology); - Assert.assertTrue(queryGroupList.size() == 2, "Two query groups should be created"); - Assert.assertTrue(queryGroupList.get(0).getGroupName().contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), - "passthrough query should be present in a separate group"); - Assert.assertTrue(queryGroupList.get(0).isReceiverQueryGroup(), "Receiver type should be set"); + Assert.assertTrue(queryGroupList.size() == 2, + "Two query groups should be created"); + Assert.assertTrue(queryGroupList.get(0).getGroupName() + .contains(SiddhiTopologyCreatorConstants.PASSTHROUGH), + "passthrough query should be present in a separate group"); + Assert.assertTrue(queryGroupList.get(0).isReceiverQueryGroup(), + "Receiver type should be set"); Assert.assertTrue(queryGroupList.get(0).getSiddhiQueries().get(0).getApp() - .contains("@source(type = 'http', receiver.url='http://localhost:8080/SweetProductionEP'," - + " @map(type = 'json'))\n" - + "define stream passthroughTest1Stream (name string, amount double);\n" + .contains("@source(type = 'http', receiver.url='http://localhost:8080" + + "/SweetProductionEP', @map(type = 'json'))\n" + + "define stream passthroughTest1Stream (name string, " + + "amount double);\n" + "@sink(type='jms',factory.initial='org.wso2.andes.jndi" - + ".PropertiesFileInitialContextFactory',provider.url='../../resources/jndi.properties'" + + ".PropertiesFileInitialContextFactory'," + + "provider.url='../../resources/jndi.properties'" + ",@distribution(strategy='partitioned', partitionKey='name'," + "@destination(destination = 'TestPlan14_Test1Stream_name_0')," + "@destination(destination = 'TestPlan14_Test1Stream_name_1')," + "@destination(destination = 'TestPlan14_Test1Stream_name_2'))," + "connection.factory.type='topic'," - + "connection.factory.jndi.name='TopicConnectionFactory',@map(type='xml')) \n" + + "connection.factory.jndi.name='TopicConnectionFactory'," + + "@map(type='xml')) \n" + "define stream Test1Stream (name string, amount double);\n" + "from passthroughTest1Stream select * insert into Test1Stream;"), "Incorrect Partial Siddhi application created"); Assert.assertTrue(queryGroupList.get(1).getSiddhiQueries().get(0).getApp() .contains("@App:name('TestPlan14-001-1') \n" + "@source(type='jms',factory.initial='org.wso2.andes.jndi" - + ".PropertiesFileInitialContextFactory',provider.url='../../resources/jndi.properties'" - + ",connection.factory.type='topic',destination ='TestPlan14_Test1Stream_name_0' ," - + " connection.factory.jndi.name='TopicConnectionFactory',@map(type='xml')) \n" + + ".PropertiesFileInitialContextFactory'," + + "provider.url='../../resources/jndi.properties'" + + ",connection.factory.type='topic',destination =" + + "'TestPlan14_Test1Stream_name_0' ," + + " connection.factory.jndi.name='TopicConnectionFactory'," + + "@map(type='xml')) \n" + "define stream Test1Stream (name string, amount double);\n" + "@sink(type='log')\n" + "define stream Test2Stream (name string, amount double);\n" @@ -1297,11 +1379,12 @@ public void testUsergivenSourceSingleGroup() { + "select name,amount\n" + "insert into Test2Stream;\n" + "end;"), "Incorrect partial Siddhi application Created"); + } /** - * If a query inside the partition contains @dist annotation then SiddhiAppValidationException should have been - * thrown + * If a query inside the partition contains @dist annotation then SiddhiAppValidationException + * should have been thrown */ @Test(dependsOnMethods = "testUsergivenSourceSingleGroup") public void testPartitionWithDistAnnotation() { @@ -1325,15 +1408,16 @@ public void testPartitionWithDistAnnotation() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); Assert.fail(); } catch (SiddhiAppValidationException e) { - Assert.assertTrue(e.getMessage().contains("Unsupported:@dist annotation inside partition queries")); + Assert.assertTrue(e.getMessage().contains("Unsupported:@dist annotation inside " + + "partition queries")); } finally { System.out.println("Testing finished"); } } /** - * If queries belongs to same execution groups contain multiple parallelism count then SiddhiAppValidationException - * should have been thrown + * If queries belongs to same execution groups contain multiple parallelism count then + * SiddhiAppValidationException should have been thrown */ @Test(dependsOnMethods = "testPartitionWithDistAnnotation") public void testGroupsWithInconsistentParalleleism() { @@ -1342,8 +1426,9 @@ public void testGroupsWithInconsistentParalleleism() { + "@source(type = 'http', topic = 'device-power', @map(type = 'json'))\n" + "define stream Test1Stream (name string, amount double);\n" + "define stream Test2Stream (name string, amount double);\n" - + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com',password='****'," - + "host='smtp.gmail.com',subject='Event from SP',to='towso2@gmail.com')\n" + + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com'," + + "password='****',host='smtp.gmail.com',subject='Event from SP'," + + "to='towso2@gmail.com')\n" + "define stream Test3Stream (name string, amount double);\n" + "define stream Test4Stream (name string, amount double);\n" + "define stream Test5Stream (name string, amount double);\n" @@ -1376,8 +1461,8 @@ public void testGroupsWithInconsistentParalleleism() { } /** - * If queries belongs to same execution groups contain multiple parallelism count then SiddhiAppValidationException - * should have been thrown + * If queries belongs to same execution groups contain multiple parallelism count then + * SiddhiAppValidationException should have been thrown */ @Test(dependsOnMethods = "testGroupsWithInconsistentParalleleism") public void testConflictingTransportStrategies() { @@ -1386,8 +1471,9 @@ public void testConflictingTransportStrategies() { + "@source(type = 'http', topic = 'device-power', @map(type = 'json'))\n" + "define stream Test1Stream (name string, amount double);\n" + "define stream Test2Stream (name string, amount double);\n" - + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com',password='****'," - + "host='smtp.gmail.com',subject='Event from SP',to='towso2@gmail.com')\n" + + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com'" + + ",password='****',host='smtp.gmail.com',subject='Event from SP'," + + "to='towso2@gmail.com')\n" + "define stream Test3Stream (name string, amount double);\n" + "define stream Test4Stream (name string, amount double);\n" + "define stream Test5Stream (name string, amount double);\n" @@ -1420,8 +1506,8 @@ public void testConflictingTransportStrategies() { } /** - * If an in memory table referenced from multiple execution groups then SiddhiAppValidationException should have - * been thrown. + * If an in memory table referenced from multiple execution groups then + * SiddhiAppValidationException should have been thrown. */ @Test(dependsOnMethods = "testConflictingTransportStrategies") public void testMultipleInMemoryTableReference() { @@ -1431,7 +1517,8 @@ public void testMultipleInMemoryTableReference() { + "define stream Test1Stream (name string, amount double);\n" + "define table filteredTable (name string, amount double);\n" + "define stream Test2Stream (name string, amount double);\n" - + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com',password='****'," + + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com'" + + ",password='****'," + "host='smtp.gmail.com',subject='Event from SP',to='towso2@gmail.com')\n" + "define stream Test3Stream (name string, amount double);\n" + "define stream Test4Stream (name string, amount double);\n" @@ -1458,17 +1545,16 @@ public void testMultipleInMemoryTableReference() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); Assert.fail(); } catch (SiddhiAppValidationException e) { - - Assert.assertTrue(e.getMessage().contains("In-Memory Table referenced from more than one execGroup: " - + "execGroup")); + Assert.assertTrue(e.getMessage().contains("In-Memory Table referenced from more than " + + "one execGroup: execGroup")); } finally { System.out.println("Testing finished"); } } /** - * If an in memory table referenced from an execution group which parallelism > 1 then SiddhiAppValidationException - * should have been thrown + * If an in memory table referenced from an execution group which parallelism > 1 then + * SiddhiAppValidationException should have been thrown */ @Test(dependsOnMethods = "testMultipleInMemoryTableReference") public void testInMemoryParallelExecGroup() { @@ -1478,8 +1564,9 @@ public void testInMemoryParallelExecGroup() { + "define stream Test1Stream (name string, amount double);\n" + "define table filteredTable (name string, amount double);\n" + "define stream Test2Stream (name string, amount double);\n" - + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com',password='****'," - + "host='smtp.gmail.com',subject='Event from SP',to='towso2@gmail.com')\n" + + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com'," + + "password='****',host='smtp.gmail.com',subject='Event from SP'," + + "to='towso2@gmail.com')\n" + "define stream Test3Stream (name string, amount double);\n" + "define stream Test4Stream (name string, amount double);\n" + "define stream Test5Stream (name string, amount double);\n" @@ -1504,8 +1591,8 @@ public void testInMemoryParallelExecGroup() { } /** - * If siddhi window defined in a query which have parallelism > 1 and does not belongs to a partition then - * SiddhiAppValidationException will be thrown. + * If siddhi window defined in a query which have parallelism > 1 and does not belongs + * to a partition then SiddhiAppValidationException will be thrown. */ @Test(dependsOnMethods = "testInMemoryParallelExecGroup") public void testWindowInParallelExecGroup() { @@ -1515,8 +1602,9 @@ public void testWindowInParallelExecGroup() { + "define stream Test1Stream (name string, amount double);\n" + "define table filteredTable (name string, amount double);\n" + "define stream Test2Stream (name string, amount double);\n" - + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com',password='****'," - + "host='smtp.gmail.com',subject='Event from SP',to='towso2@gmail.com')\n" + + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com'," + + "password='****',host='smtp.gmail.com',subject='Event from SP'," + + "to='towso2@gmail.com')\n" + "define stream Test3Stream (name string, amount double);\n" + "define stream Test4Stream (name string, amount double);\n" + "define stream Test5Stream (name string, amount double);\n" @@ -1534,16 +1622,16 @@ public void testWindowInParallelExecGroup() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); Assert.fail(); } catch (SiddhiAppValidationException e) { - Assert.assertTrue(e.getMessage().contains("Window queries used with parallel greater than " - + "1 outside partitioned stream")); + Assert.assertTrue(e.getMessage().contains("Window queries used with parallel " + + "greater than 1 outside partitioned stream")); } finally { System.out.println("Testing finished"); } } /** - * If siddhi window defined in a query which have parallelism > 1 and does not belongs to a partition then - * SiddhiAppValidationException will be thrown. + * If siddhi window defined in a query which have parallelism > 1 and does not belongs to + * a partition then SiddhiAppValidationException will be thrown. */ @Test(dependsOnMethods = "testWindowInParallelExecGroup") public void testMultipleWindowReference() { @@ -1553,8 +1641,9 @@ public void testMultipleWindowReference() { + "define stream Test1Stream (name string, amount double);\n" + "define table filteredTable (name string, amount double);\n" + "define stream Test2Stream (name string, amount double);\n" - + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com',password='****'," - + "host='smtp.gmail.com',subject='Event from SP',to='towso2@gmail.com')\n" + + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com'" + + ",password='****',host='smtp.gmail.com',subject='Event from SP'," + + "to='towso2@gmail.com')\n" + "define stream Test3Stream (name string, amount double);\n" + "define stream Test4Stream (name string, amount double);\n" + "define stream Test5Stream (name string, amount double);\n" @@ -1572,16 +1661,16 @@ public void testMultipleWindowReference() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); Assert.fail(); } catch (SiddhiAppValidationException e) { - Assert.assertTrue(e.getMessage().contains("Window queries used with parallel greater than 1" - + " outside partitioned stream")); + Assert.assertTrue(e.getMessage().contains("Window queries used with parallel greater " + + "than 1 outside partitioned stream")); } finally { System.out.println("Testing finished"); } } /** - * If an execution group contains joins while the parallelism > 1 the SiddhiAppValidationException will be - * thrown + * If an execution group contains joins while the parallelism > 1 the + * SiddhiAppValidationException will be thrown */ @Test(dependsOnMethods = "testMultipleWindowReference") public void testJoinsWithParallel() { @@ -1603,26 +1692,28 @@ public void testJoinsWithParallel() { Assert.fail(); } catch (SiddhiAppValidationException e) { System.out.println(e.getMessage()); - Assert.assertTrue(e.getMessage().contains("Join queries used with parallel greater than 1" - + " outside partitioned stream")); + Assert.assertTrue(e.getMessage().contains("Join queries used with parallel greater " + + "than 1 outside partitioned stream")); } finally { System.out.println("Testing finished"); } } /** - * If an execution group contains more than one partitions with same partitionkey then SiddhiValidationException - * will be thrown + * If an execution group contains more than one partitions with same partitionkey then + * SiddhiValidationException will be thrown */ @Test(dependsOnMethods = "testJoinsWithParallel") public void testMultiplePartitionExecGroup() { String siddhiApp = "@App:name('TestPlan23')\n" - + "@App:description('Testing the MB implementation with multiple FGs ALLs and RRs strategies.')\n" + + "@App:description('Testing the MB implementation with multiple FGs ALLs and " + + "RRs strategies.')\n" + "@source(type = 'http', topic = 'device-power', @map(type = 'json'))\n" + "define stream Test1Stream (name string, amount double,value double);\n" + "define stream Test2Stream (name string, amount double, value double);\n" - + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com',password='****'," - + "host='smtp.gmail.com',subject='Event from SP',to='towso2@gmail.com')\n" + + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com'," + + "password='****',host='smtp.gmail.com',subject='Event from SP'," + + "to='towso2@gmail.com')\n" + "define stream Test3Stream (name string, amount double, value double);\n" + "define stream Test4Stream (name string, amount double, value double);\n" + "define stream Test5Stream (name string, amount double, value double);\n" @@ -1656,26 +1747,28 @@ public void testMultiplePartitionExecGroup() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); Assert.fail(); } catch (SiddhiAppValidationException e) { - Assert.assertTrue(e.getMessage().contains("Unsupported in distributed setup :More than 1 partition" + - " residing on the same execGroup")); + Assert.assertTrue(e.getMessage().contains("Unsupported in distributed setup :More " + + "than 1 partition residing on the same execGroup")); } finally { System.out.println("Testing finished"); } } /** - * If a siddhi application contains range type partitioning then SiddhiAppValidationException will be thrown as - * that feature is not supported still + * If a siddhi application contains range type partitioning then SiddhiAppValidationException + * will be thrown as that feature is not supported still */ @Test(dependsOnMethods = "testMultiplePartitionExecGroup") public void testRangePartition() { String siddhiApp = "@App:name('TestPlan24')\n" - + "@App:description('Testing the MB implementation with multiple FGs ALLs and RRs strategies.')\n" + + "@App:description('Testing the MB implementation with multiple FGs ALLs and " + + "RRs strategies.')\n" + "@source(type = 'http', topic = 'device-power', @map(type = 'json'))\n" + "define stream Test1Stream (name string, amount double,value double);\n" + "define stream Test2Stream (name string, amount double, value double);\n" - + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com',password='****'," - + "host='smtp.gmail.com',subject='Event from SP',to='towso2@gmail.com')\n" + + "@Sink(type='email', @map(type='json'), username='wso2', address='test@wso2.com'," + + "password='****',host='smtp.gmail.com',subject='Event from SP'," + + "to='towso2@gmail.com')\n" + "define stream Test3Stream (name string, amount double, value double);\n" + "define stream Test4Stream (name string, amount double, value double);\n" + "define stream Test5Stream (name string, amount double, value double);\n" @@ -1711,7 +1804,8 @@ public void testRangePartition() { SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); Assert.fail(); } catch (SiddhiAppValidationException e) { - Assert.assertTrue(e.getMessage().contains("Range PartitionType not Supported in Distributed SetUp")); + Assert.assertTrue(e.getMessage().contains("Range PartitionType not Supported in " + + "Distributed SetUp")); } finally { System.out.println("Testing finished"); } @@ -1720,11 +1814,13 @@ public void testRangePartition() { private Map> createSiddhiAppRuntimes( SiddhiManager siddhiManager, List queryGroupList) { - Map> siddhiAppRuntimeMap = new HashMap<>(queryGroupList.size()); + Map> siddhiAppRuntimeMap = new HashMap<>(queryGroupList + .size()); for (DeployableSiddhiQueryGroup group : queryGroupList) { List runtimeList = new ArrayList<>(group.getSiddhiQueries().size()); for (SiddhiQuery siddhiQuery : group.getSiddhiQueries()) { - SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(siddhiQuery.getApp()); + SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(siddhiQuery + .getApp()); runtime.start(); try { Thread.sleep(500);