Skip to content

Commit

Permalink
Some code refactoring done.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketharan committed Sep 19, 2018
1 parent ade6b6e commit ff4f904
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ protected List<Integer> getPartitionNumbers(int appParallelism, int availablePar
}
}

protected List<SiddhiQuery> generateQueryList(String queryTemplate, String parentAppName, String queryGroupName, int
parallelism) {
protected List<SiddhiQuery> generateQueryList(String queryTemplate, String queryGroupName, int parallelism) {
List<SiddhiQuery> queries = new ArrayList<>(parallelism);
for (int i = 0; i < parallelism; i++) {
Map<String, String> valuesMap = new HashMap<>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,47 +38,42 @@
* Creates distributed siddhi application.
*/
public class SPMBSiddhiAppCreator extends AbstractSiddhiAppCreator {
private static final Logger log = Logger.getLogger(SPSiddhiAppCreator.class);
private static final Logger log = Logger.getLogger(SPMBSiddhiAppCreator.class);
private static final int TIMEOUT = 120;
private Map<String, Integer> rrTrackerMap = new HashMap<>();
private int rrHolderCount = 0;

@Override
protected List<SiddhiQuery> createApps(String siddhiAppName, SiddhiQueryGroup queryGroup) {
String groupName = queryGroup.getName();
String queryTemplate = queryGroup.getSiddhiApp();
List<SiddhiQuery> queryList = generateQueryList(queryTemplate, siddhiAppName, groupName, queryGroup
List<SiddhiQuery> queryList = generateQueryList(queryTemplate, groupName, queryGroup
.getParallelism());
processInputStreams(siddhiAppName, groupName, queryList, queryGroup.getInputStreams().values());
processOutputStreams(siddhiAppName, groupName, queryList, queryGroup.getOutputStreams().values());
processInputStreams(siddhiAppName, queryList, queryGroup.getInputStreams().values());
processOutputStreams(siddhiAppName, queryList, queryGroup.getOutputStreams().values());
return queryList;
}

/**
*
* @param siddhiAppName Name of the initial user defined siddhi application
* @param groupName Name of the currently processing execution group
* @param queryList Contains the query of the current execution group replicated
* to the parallelism of the group.
* @param outputStreams Collection of current execution group's output streams
*
* Assign the jms transport headers for each instance siddhi applications output streams for given execution group
*/
private void processOutputStreams(String siddhiAppName, String groupName, List<SiddhiQuery> queryList,
private void processOutputStreams(String siddhiAppName, List<SiddhiQuery> queryList,
Collection<OutputStreamDataHolder> outputStreams) {
//Store the data for sink stream header
Map<String, String> sinkValuesMap = new HashMap<>();
for (OutputStreamDataHolder outputStream : outputStreams) {

//Contains the header string for each stream
Map<String, String> sinkList = new HashMap<>();

//contains the parallelism count for each partition
//Contains the parallelism count for each partition key
Map<String, Integer> partitionKeys = new HashMap<>();

//to keep track of multiple RR holders in a single output stream
int rrHolderCount = 0;

for (PublishingStrategyDataHolder holder : outputStream.getPublishingStrategyList()) {

sinkValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" +
outputStream.getStreamName() + (holder.getGroupingField() == null ? "" : ("_" + holder
.getGroupingField())));
Expand All @@ -87,46 +82,33 @@ private void processOutputStreams(String siddhiAppName, String groupName, List<S
partitionKeys.get(holder.getGroupingField()) > holder.getParallelism()) {
continue;
}
//Remove if there is any previous R/R or ALL publishing
sinkValuesMap.remove(siddhiAppName + "_" + outputStream.getStreamName());

//add or replace new partition key and parallelism
partitionKeys.put(holder.getGroupingField(), holder.getParallelism());
//add partition key for header creation
sinkValuesMap.put(ResourceManagerConstants.PARTITION_KEY, holder.getGroupingField());
//use to add destination annotation
List<String> destinations = new ArrayList<>(holder.getParallelism());

for (int i = 0; i < holder.getParallelism(); i++) {
Map<String, String> destinationMap = new HashMap<>(holder.getParallelism());
//create destination annotation
destinationMap.put(ResourceManagerConstants.PARTITION_TOPIC,
sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION)
+ "_" + String.valueOf(i));
//create the distribution full annotation for each strategy
destinations.add(getUpdatedQuery(ResourceManagerConstants.DESTINATION_TOPIC, destinationMap));
}

//create the final distribution annotation
sinkValuesMap.put(ResourceManagerConstants.DESTINATIONS,
StringUtils.join(destinations, ","));
String sinkString = getUpdatedQuery(ResourceManagerConstants.PARTITIONED_MB_SINK_TEMPLATE,
sinkValuesMap);
sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION), sinkString);

} else if (holder.getStrategy() == TransportStrategy.ROUND_ROBIN) {



if (partitionKeys.isEmpty()) {
//if holder uses RR as strategy then unique topic name will be defined
sinkValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" +
outputStream.getStreamName() + "_" + String.valueOf(rrHolderCount));
rrHolderCount++;

String sinkString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_QUEUE_SINK_TEMPLATE,
sinkValuesMap);
sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION), sinkString);
}

} else if (holder.getStrategy() == TransportStrategy.ALL) {
String sinkString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_TOPIC_SINK_TEMPLATE,
Expand All @@ -137,104 +119,79 @@ private void processOutputStreams(String siddhiAppName, String groupName, List<S
Map<String, String> queryValuesMap = new HashMap<>(1);
queryValuesMap.put(outputStream.getStreamName(), StringUtils.join(sinkList.values(), "\n"));
updateQueryList(queryList, queryValuesMap);

}
}

private Map<String, Integer> rrTrackerMap = new HashMap<>();
/**
*
* @param siddhiAppName Name of the initial user defined siddhi application
* @param groupName Name of the currently processing execution group
* @param queryList Contains the query of the current execution group replicated
* to the parallelism of the group.
* @param inputStreams Collection of current execution group's input streams
*
* Assign the jms transport headers for each instance siddhi applications input
* streams for a given execution group
*/
private void processInputStreams(String siddhiAppName, String groupName, List<SiddhiQuery> queryList,
private void processInputStreams(String siddhiAppName, List<SiddhiQuery> queryList,
Collection<InputStreamDataHolder> inputStreams) {
Map<String, String> sourceValuesMap = new HashMap<>();


for (InputStreamDataHolder inputStream : inputStreams) {

SubscriptionStrategyDataHolder subscriptionStrategy = inputStream.getSubscriptionStrategy();
sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" +
inputStream.getStreamName() + (inputStream.getSubscriptionStrategy().getPartitionKey() ==
null ? "" : ("_" + inputStream.getSubscriptionStrategy().getPartitionKey())));

if (!inputStream.isUserGiven()) {
if (subscriptionStrategy.getStrategy() == TransportStrategy.FIELD_GROUPING) {

for (int i = 0; i < queryList.size(); i++) {
List<String> sourceQueries = new ArrayList<>();
//consider the each instance source code of single siddhi execution group assigns
//then the particular stream headers produced
List<Integer> partitionNumbers = getPartitionNumbers(queryList.size(), subscriptionStrategy
.getOfferedParallelism(), i);
for (int topicCount : partitionNumbers) {
String topicName = siddhiAppName + "_" + inputStream.getStreamName()
+ "_" + inputStream.getSubscriptionStrategy().getPartitionKey()
+ "_" + Integer.toString(topicCount);
sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, topicName);


String sourceQuery =
getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_TOPIC_SOURCE_TEMPLATE,
sourceValuesMap);
sourceQueries.add(sourceQuery);
}

String combinedQueryHeader = StringUtils.join(sourceQueries, "\n");
//
Map<String, String> queryValuesMap = new HashMap<>(1);
queryValuesMap.put(inputStream.getStreamName(), combinedQueryHeader);
String updatedQuery = getUpdatedQuery(queryList.get(i).getApp(), queryValuesMap);
queryList.get(i).setApp(updatedQuery);

}
} else if (subscriptionStrategy.getStrategy() == TransportStrategy.ROUND_ROBIN) {

String queueName;
int queueCount;
int queueCount = 0;
if (rrTrackerMap.get(inputStream.getStreamName()) != null) {
queueCount = rrTrackerMap.get(inputStream.getStreamName());
queueName = siddhiAppName + "_" + inputStream.getStreamName() + "_"
+ Integer.toString(queueCount);
queueCount += 1;
rrTrackerMap.put(inputStream.getStreamName(), queueCount);

} else {
queueCount = 0;
rrTrackerMap.put(inputStream.getStreamName(), queueCount);
queueName = siddhiAppName + "_" + inputStream.getStreamName() + "_"
+ Integer.toString(queueCount);
queueCount += 1;
rrTrackerMap.put(inputStream.getStreamName(), queueCount);
}

queueName = siddhiAppName + "_" + inputStream.getStreamName() + "_"
+ Integer.toString(queueCount);
queueCount += 1;
rrTrackerMap.put(inputStream.getStreamName(), queueCount);
sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, queueName);
String sourceString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_QUEUE_SOURCE_TEMPLATE,
sourceValuesMap);
Map<String, String> queryValuesMap = new HashMap<>(1);
queryValuesMap.put(inputStream.getStreamName(), sourceString);
updateQueryList(queryList, queryValuesMap);
} else {

sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" +
inputStream.getStreamName());

for (int i = 0; i < queryList.size(); i++) {

for (SiddhiQuery aQueryList : queryList) {
String sourceString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_TOPIC_SOURCE_TEMPLATE,
sourceValuesMap);

Map<String, String> queryValuesMap = new HashMap<>(1);
queryValuesMap.put(inputStream.getStreamName(), sourceString);
String updatedQuery = getUpdatedQuery(queryList.get(i).getApp(), queryValuesMap);
queryList.get(i).setApp(updatedQuery);
String updatedQuery = getUpdatedQuery(aQueryList.getApp(), queryValuesMap);
aQueryList.setApp(updatedQuery);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class SPSiddhiAppCreator extends AbstractSiddhiAppCreator {
protected List<SiddhiQuery> createApps(String siddhiAppName, SiddhiQueryGroup queryGroup) {
String groupName = queryGroup.getName();
String queryTemplate = queryGroup.getSiddhiApp();
List<SiddhiQuery> queryList = generateQueryList(queryTemplate, siddhiAppName, groupName, queryGroup
List<SiddhiQuery> queryList = generateQueryList(queryTemplate, groupName, queryGroup
.getParallelism());
processInputStreams(siddhiAppName, groupName, queryList, queryGroup.getInputStreams().values());
processOutputStreams(siddhiAppName, groupName, queryList, queryGroup.getOutputStreams().values());
Expand Down Expand Up @@ -157,8 +157,8 @@ private void createTopicPartitions(Map<String, Integer> topicParallelismMap) {
log.info("Added " + partitions + " partitions to topic " + topic);
} else if (existingPartitions > partitions) {
if (transportChannelCreationEnabled) {
log.info("Topic " + topic + " has higher number of partitions than expected partition count. Hence"
+ " have to delete the topic and recreate with " + partitions + "partitions.");
log.info("Topic " + topic + " has higher number of partitions than expected partition count. "
+ "Hence have to delete the topic and recreate with " + partitions + "partitions.");
AdminUtils.deleteTopic(zkUtils, topic);
long startTime = System.currentTimeMillis();
while (AdminUtils.topicExists(zkUtils, topic)) {
Expand Down

0 comments on commit ff4f904

Please sign in to comment.