Skip to content

Commit

Permalink
Some code refactoring done.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketharan committed Sep 21, 2018
1 parent ff4f904 commit 01d61f5
Show file tree
Hide file tree
Showing 4 changed files with 474 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,47 +30,51 @@
import java.util.Map;

/**
* Abstract implementation of {@link SiddhiAppCreator}. Developers can use this extension point to implement custom
* Siddhi App Creator based on the distribute implementation.
* Abstract implementation of {@link SiddhiAppCreator}. Developers can use this extension point
* to implement custom Siddhi App Creator based on the distribute implementation.
*/
public abstract class AbstractSiddhiAppCreator implements SiddhiAppCreator {
protected boolean transportChannelCreationEnabled;

public List<DeployableSiddhiQueryGroup> createApps(SiddhiTopology topology) {
transportChannelCreationEnabled = topology.isTransportChannelCreationEnabled();
List<DeployableSiddhiQueryGroup> deployableSiddhiQueryGroupList = new ArrayList<>(topology.getQueryGroupList
().size());
List<DeployableSiddhiQueryGroup> deployableSiddhiQueryGroupList =
new ArrayList<>(topology.getQueryGroupList().size());
for (SiddhiQueryGroup queryGroup : topology.getQueryGroupList()) {
DeployableSiddhiQueryGroup deployableQueryGroup = new DeployableSiddhiQueryGroup(queryGroup.getName(),
queryGroup.isReceiverQueryGroup(),
queryGroup.getParallelism());
DeployableSiddhiQueryGroup deployableQueryGroup =
new DeployableSiddhiQueryGroup(queryGroup.getName(),
queryGroup.isReceiverQueryGroup(),
queryGroup.getParallelism());
deployableQueryGroup.setSiddhiQueries(createApps(topology.getName(), queryGroup));
deployableSiddhiQueryGroupList.add(deployableQueryGroup);
}
return deployableSiddhiQueryGroupList;
}

/**
* This method should return valid concrete Siddhi App/s as Strings. No. of returned Siddhi Apps should equal the
* parallelism count for query group.
* This method should return valid concrete Siddhi App/s as Strings. No. of returned Siddhi
* Apps should equal the parallelism count for query group.
*
* @param queryGroup Input query group to produce Siddhi Apps.
* @return List of valid concrete Siddhi Apps as String.
*/
protected abstract List<SiddhiQuery> createApps(String siddhiAppName, SiddhiQueryGroup queryGroup);
protected abstract List<SiddhiQuery> createApps(String siddhiAppName,
SiddhiQueryGroup queryGroup);

protected List<Integer> getPartitionNumbers(int appParallelism, int availablePartitionCount, int currentAppNum) {
protected List<Integer> getPartitionNumbers(int appParallelism, int availablePartitionCount,
int currentAppNum) {
List<Integer> partitionNumbers = new ArrayList<>();
if (availablePartitionCount == appParallelism) {
partitionNumbers.add(currentAppNum);
return partitionNumbers;
} else {
//availablePartitionCount < appParallelism scenario cannot occur according to design. Hence if
// availablePartitionCount > appParallelism
//availablePartitionCount < appParallelism scenario cannot occur according to design.
// Hence if availablePartitionCount > appParallelism
//// TODO: 10/19/17 improve logic
int partitionsPerNode = availablePartitionCount / appParallelism;
if (currentAppNum + 1 == appParallelism) { //if last app
int remainingPartitions = availablePartitionCount - ((appParallelism - 1) * partitionsPerNode);
int remainingPartitions = availablePartitionCount - ((appParallelism - 1) *
partitionsPerNode);
for (int j = 0; j < remainingPartitions; j++) {
partitionNumbers.add((currentAppNum * partitionsPerNode) + j);
}
Expand All @@ -84,14 +88,16 @@ protected List<Integer> getPartitionNumbers(int appParallelism, int availablePar
}
}

protected List<SiddhiQuery> generateQueryList(String queryTemplate, String queryGroupName, int parallelism) {
protected List<SiddhiQuery> generateQueryList(String queryTemplate, String queryGroupName,
int parallelism) {
List<SiddhiQuery> queries = new ArrayList<>(parallelism);
for (int i = 0; i < parallelism; i++) {
Map<String, String> valuesMap = new HashMap<>(1);
String appName = queryGroupName + "-" + (i + 1);
valuesMap.put(ResourceManagerConstants.APP_NAME, appName);
StrSubstitutor substitutor = new StrSubstitutor(valuesMap);
queries.add(new SiddhiQuery(appName, substitutor.replace(queryTemplate), false));
queries.add(new SiddhiQuery(appName, substitutor.replace(queryTemplate),
false));
}
return queries;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
Expand Down Expand Up @@ -60,8 +60,8 @@ protected List<SiddhiQuery> createApps(String siddhiAppName, SiddhiQueryGroup qu
* @param queryList Contains the query of the current execution group replicated
* to the parallelism of the group.
* @param outputStreams Collection of current execution group's output streams
*
* Assign the jms transport headers for each instance siddhi applications output streams for given execution group
* Assign the jms transport headers for each instance siddhi applications output streams
* for given execution group
*/
private void processOutputStreams(String siddhiAppName, List<SiddhiQuery> queryList,
Collection<OutputStreamDataHolder> outputStreams) {
Expand All @@ -75,49 +75,58 @@ private void processOutputStreams(String siddhiAppName, List<SiddhiQuery> queryL

for (PublishingStrategyDataHolder holder : outputStream.getPublishingStrategyList()) {
sinkValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" +
outputStream.getStreamName() + (holder.getGroupingField() == null ? "" : ("_" + holder
.getGroupingField())));
outputStream.getStreamName() + (holder.getGroupingField() == null ? ""
: ("_" + holder.getGroupingField())));
if (holder.getStrategy() == TransportStrategy.FIELD_GROUPING) {
if (partitionKeys.get(holder.getGroupingField()) != null &&
partitionKeys.get(holder.getGroupingField()) > holder.getParallelism()) {
partitionKeys.get(holder.getGroupingField()) > holder.getParallelism())
{
continue;
}

partitionKeys.put(holder.getGroupingField(), holder.getParallelism());
sinkValuesMap.put(ResourceManagerConstants.PARTITION_KEY, holder.getGroupingField());
sinkValuesMap.put(ResourceManagerConstants.PARTITION_KEY,
holder.getGroupingField());
List<String> destinations = new ArrayList<>(holder.getParallelism());

for (int i = 0; i < holder.getParallelism(); i++) {
Map<String, String> destinationMap = new HashMap<>(holder.getParallelism());
destinationMap.put(ResourceManagerConstants.PARTITION_TOPIC,
sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION)
+ "_" + String.valueOf(i));
destinations.add(getUpdatedQuery(ResourceManagerConstants.DESTINATION_TOPIC, destinationMap));
destinations.add(getUpdatedQuery(ResourceManagerConstants.DESTINATION_TOPIC,
destinationMap));
}

sinkValuesMap.put(ResourceManagerConstants.DESTINATIONS,
StringUtils.join(destinations, ","));
String sinkString = getUpdatedQuery(ResourceManagerConstants.PARTITIONED_MB_SINK_TEMPLATE,
String sinkString =
getUpdatedQuery(ResourceManagerConstants.PARTITIONED_MB_SINK_TEMPLATE,
sinkValuesMap);
sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION), sinkString);
sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION),
sinkString);

} else if (holder.getStrategy() == TransportStrategy.ROUND_ROBIN) {
//if holder uses RR as strategy then unique topic name will be defined
sinkValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" +
outputStream.getStreamName() + "_" + String.valueOf(rrHolderCount));
sinkValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName
+ "_" + outputStream.getStreamName() + "_"
+ String.valueOf(rrHolderCount));
rrHolderCount++;
String sinkString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_QUEUE_SINK_TEMPLATE,
sinkValuesMap);
sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION), sinkString);
String sinkString = getUpdatedQuery(ResourceManagerConstants
.DEFAULT_MB_QUEUE_SINK_TEMPLATE, sinkValuesMap);
sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION)
, sinkString);

} else if (holder.getStrategy() == TransportStrategy.ALL) {
String sinkString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_TOPIC_SINK_TEMPLATE,
sinkValuesMap);
sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION), sinkString);
String sinkString = getUpdatedQuery(ResourceManagerConstants
.DEFAULT_MB_TOPIC_SINK_TEMPLATE, sinkValuesMap);
sinkList.put(sinkValuesMap.get(ResourceManagerConstants.MB_DESTINATION)
, sinkString);
}
}
Map<String, String> queryValuesMap = new HashMap<>(1);
queryValuesMap.put(outputStream.getStreamName(), StringUtils.join(sinkList.values(), "\n"));
queryValuesMap.put(outputStream.getStreamName(),
StringUtils.join(sinkList.values(), "\n"));
updateQueryList(queryList, queryValuesMap);
}
}
Expand All @@ -128,7 +137,6 @@ private void processOutputStreams(String siddhiAppName, List<SiddhiQuery> queryL
* @param queryList Contains the query of the current execution group replicated
* to the parallelism of the group.
* @param inputStreams Collection of current execution group's input streams
*
* Assign the jms transport headers for each instance siddhi applications input
* streams for a given execution group
*/
Expand All @@ -137,31 +145,34 @@ private void processInputStreams(String siddhiAppName, List<SiddhiQuery> queryLi
Map<String, String> sourceValuesMap = new HashMap<>();
for (InputStreamDataHolder inputStream : inputStreams) {

SubscriptionStrategyDataHolder subscriptionStrategy = inputStream.getSubscriptionStrategy();
SubscriptionStrategyDataHolder subscriptionStrategy = inputStream
.getSubscriptionStrategy();
sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" +
inputStream.getStreamName() + (inputStream.getSubscriptionStrategy().getPartitionKey() ==
null ? "" : ("_" + inputStream.getSubscriptionStrategy().getPartitionKey())));
inputStream.getStreamName() + (inputStream.getSubscriptionStrategy()
.getPartitionKey() == null ? "" : ("_" + inputStream.getSubscriptionStrategy()
.getPartitionKey())));
if (!inputStream.isUserGiven()) {
if (subscriptionStrategy.getStrategy() == TransportStrategy.FIELD_GROUPING) {

for (int i = 0; i < queryList.size(); i++) {
List<String> sourceQueries = new ArrayList<>();
List<Integer> partitionNumbers = getPartitionNumbers(queryList.size(), subscriptionStrategy
List<Integer> partitionNumbers = getPartitionNumbers(queryList.size(),
subscriptionStrategy
.getOfferedParallelism(), i);
for (int topicCount : partitionNumbers) {
String topicName = siddhiAppName + "_" + inputStream.getStreamName()
+ "_" + inputStream.getSubscriptionStrategy().getPartitionKey()
+ "_" + Integer.toString(topicCount);
sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, topicName);
String sourceQuery =
getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_TOPIC_SOURCE_TEMPLATE,
sourceValuesMap);
String sourceQuery = getUpdatedQuery(ResourceManagerConstants
.DEFAULT_MB_TOPIC_SOURCE_TEMPLATE, sourceValuesMap);
sourceQueries.add(sourceQuery);
}
String combinedQueryHeader = StringUtils.join(sourceQueries, "\n");
Map<String, String> queryValuesMap = new HashMap<>(1);
queryValuesMap.put(inputStream.getStreamName(), combinedQueryHeader);
String updatedQuery = getUpdatedQuery(queryList.get(i).getApp(), queryValuesMap);
String updatedQuery = getUpdatedQuery(queryList.get(i).getApp()
, queryValuesMap);
queryList.get(i).setApp(updatedQuery);
}
} else if (subscriptionStrategy.getStrategy() == TransportStrategy.ROUND_ROBIN) {
Expand All @@ -176,18 +187,18 @@ private void processInputStreams(String siddhiAppName, List<SiddhiQuery> queryLi
queueCount += 1;
rrTrackerMap.put(inputStream.getStreamName(), queueCount);
sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, queueName);
String sourceString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_QUEUE_SOURCE_TEMPLATE,
sourceValuesMap);
String sourceString = getUpdatedQuery(ResourceManagerConstants
.DEFAULT_MB_QUEUE_SOURCE_TEMPLATE, sourceValuesMap);
Map<String, String> queryValuesMap = new HashMap<>(1);
queryValuesMap.put(inputStream.getStreamName(), sourceString);
updateQueryList(queryList, queryValuesMap);
} else {
sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName + "_" +
inputStream.getStreamName());
sourceValuesMap.put(ResourceManagerConstants.MB_DESTINATION, siddhiAppName
+ "_" + inputStream.getStreamName());

for (SiddhiQuery aQueryList : queryList) {
String sourceString = getUpdatedQuery(ResourceManagerConstants.DEFAULT_MB_TOPIC_SOURCE_TEMPLATE,
sourceValuesMap);
String sourceString = getUpdatedQuery(ResourceManagerConstants
.DEFAULT_MB_TOPIC_SOURCE_TEMPLATE, sourceValuesMap);
Map<String, String> queryValuesMap = new HashMap<>(1);
queryValuesMap.put(inputStream.getStreamName(), sourceString);
String updatedQuery = getUpdatedQuery(aQueryList.getApp(), queryValuesMap);
Expand Down
Loading

0 comments on commit 01d61f5

Please sign in to comment.