Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve IncrementalDataPurger logic and Persisted Aggregation #1724

Merged
merged 2 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
# limitations under the License.

language: java

jdk:
- openjdk8
after_success:
- bash <(curl -s https://codecov.io/bash)

script: mvn clean install -q -B -V | grep -v DEBUG; exit "${PIPESTATUS[0]}";
cache:
directories:
- $HOME/.m2
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.execution.query.OnDemandQuery;
import io.siddhi.query.api.execution.query.input.store.InputStore;
import io.siddhi.query.api.execution.query.selection.OrderByAttribute;
import io.siddhi.query.api.execution.query.selection.OutputAttribute;
import io.siddhi.query.api.execution.query.selection.Selector;
import io.siddhi.query.api.expression.Expression;
Expand Down Expand Up @@ -408,6 +409,7 @@ private OnDemandQuery getOnDemandQuery(Table table, long timeFrom, long timeTo)
outputAttributes.add(new OutputAttribute(new Variable(AGG_START_TIMESTAMP_COL)));
Selector selector = Selector.selector().addSelectionList(outputAttributes)
.groupBy(Expression.variable(AGG_START_TIMESTAMP_COL))
.orderBy(Expression.variable(AGG_START_TIMESTAMP_COL), OrderByAttribute.Order.DESC)
.limit(Expression.value(1));
InputStore inputStore;
if (timeTo != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void process(ComplexEventChunk complexEventChunk) {
if (outputData.length == 3) {
Date fromTime = new Date((Long) outputData[0]);
Date toTime = new Date((Long) outputData[1]);
log.info("Aggregation executed for duration " + duration + " from " + fromTime + " to " +
log.debug("Aggregation executed for duration " + duration + " from " + fromTime + " to " +
toTime + " and " + outputData[2] + " records has been successfully updated ");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,10 @@ public final class SiddhiConstants {
public static final String PLACEHOLDER_QUERY = "{{QUERY}}";
public static final String PLACEHOLDER_SELECTORS = "{{SELECTORS}}";
public static final String PLACEHOLDER_CONDITION = "{{CONDITION}}";
public static final String PLACEHOLDER_INNER_QUERY = "{{INNER_QUERY}}";
public static final String PLACEHOLDER_INNER_QUERY_1 = "{{INNER_QUERY_1}}";
public static final String PLACEHOLDER_INNER_QUERY_2 = "{{INNER_QUERY_2}}";
public static final String PLACEHOLDER_FROM_CONDITION = "{{FROM_CONDITION}}";
public static final String PLACEHOLDER_ON_CONDITION = "{{ON_CONDITION}}";

public static final String INSERT_TO_TABLE_NAME = "TO_TABLE_NAME";
public static final String FROM_TABLE_NAME = "FROM_TABLE_NAME";
Expand All @@ -172,12 +174,14 @@ public final class SiddhiConstants {
public static final String TO_TIMESTAMP = "TO_TIMESTAMP";
public static final String SUB_SELECT_QUERY_REF_T2 = "t2";
public static final String SUB_SELECT_QUERY_REF_T1 = "t1";
public static final String INNER_SELECT_QUERY_REF_T3 = "t3";
public static final String EQUALS = " = ";
public static final String SQL_AS = " AS ";
public static final String SQL_AND = " AND ";
public static final String SQL_ON = " ON ";
public static final String SQL_SELECT = " SELECT ";
public static final String SQL_WHERE = " WHERE ";
public static final String SQL_FROM = " FROM ";
public static final String SQL_NOT_NULL = " IS NOT NULL ";

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import static io.siddhi.core.util.SiddhiConstants.EQUALS;
import static io.siddhi.core.util.SiddhiConstants.FROM_TIMESTAMP;
import static io.siddhi.core.util.SiddhiConstants.FUNCTION_NAME_CUD;
import static io.siddhi.core.util.SiddhiConstants.INNER_SELECT_QUERY_REF_T3;
import static io.siddhi.core.util.SiddhiConstants.METRIC_INFIX_AGGREGATIONS;
import static io.siddhi.core.util.SiddhiConstants.METRIC_TYPE_FIND;
import static io.siddhi.core.util.SiddhiConstants.METRIC_TYPE_INSERT;
Expand All @@ -125,12 +126,15 @@
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_CONDITION;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_DURATION;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_FROM_CONDITION;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_INNER_QUERY;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_INNER_QUERY_1;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_INNER_QUERY_2;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_ON_CONDITION;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_SELECTORS;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_TABLE_NAME;
import static io.siddhi.core.util.SiddhiConstants.SQL_AND;
import static io.siddhi.core.util.SiddhiConstants.SQL_AS;
import static io.siddhi.core.util.SiddhiConstants.SQL_FROM;
import static io.siddhi.core.util.SiddhiConstants.SQL_NOT_NULL;
import static io.siddhi.core.util.SiddhiConstants.SQL_SELECT;
import static io.siddhi.core.util.SiddhiConstants.SQL_WHERE;
import static io.siddhi.core.util.SiddhiConstants.SUB_SELECT_QUERY_REF_T1;
Expand Down Expand Up @@ -1193,6 +1197,7 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
StringJoiner innerSelectT2ColumnJoiner = new StringJoiner(", ", SQL_SELECT, " ");

StringJoiner onConditionBuilder = new StringJoiner(SQL_AND);
StringJoiner subSelectT2OnConditionBuilder = new StringJoiner(SQL_AND);

StringJoiner groupByQueryBuilder = new StringJoiner(", ");
StringJoiner finalSelectQuery = new StringJoiner(" ");
Expand All @@ -1204,6 +1209,8 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
String innerFromClause = SQL_FROM + parentAggregationTable.getTableDefinition().getId();
String innerWhereFilterClause;
String groupByClause;
String innerT2WhereCondition;


StringJoiner innerT2Query = new StringJoiner(" ");
StringJoiner subQueryT1 = new StringJoiner(" ");
Expand Down Expand Up @@ -1232,8 +1239,14 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
groupByQueryBuilder.add(variable.getAttributeName());
onConditionBuilder.add(SUB_SELECT_QUERY_REF_T1 + "." + variable.getAttributeName() +
EQUALS + SUB_SELECT_QUERY_REF_T2 + "." + variable.getAttributeName());
subSelectT2OnConditionBuilder.add(parentAggregationTable.getTableDefinition().getId() + "." +
variable.getAttributeName() + EQUALS + INNER_SELECT_QUERY_REF_T3 + "." +
variable.getAttributeName());
});

innerT2WhereCondition = INNER_SELECT_QUERY_REF_T3 + "." + groupByVariableList.get(0).getAttributeName() +
SQL_NOT_NULL;

for (ExpressionExecutor expressionExecutor : expressionExecutors) {

if (expressionExecutor instanceof VariableExpressionExecutor) {
Expand All @@ -1244,11 +1257,16 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
} else if (!variableExpressionExecutor.getAttribute().getName().equals(AGG_EXTERNAL_TIMESTAMP_COL)) {
subSelectT2ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName());
if (groupByColumnNames.contains(variableExpressionExecutor.getAttribute().getName())) {
subSelectT2ColumnJoiner.add(INNER_SELECT_QUERY_REF_T3 + "." +
variableExpressionExecutor.getAttribute().getName() + SQL_AS +
variableExpressionExecutor.getAttribute().getName()) ;
outerSelectColumnJoiner.add(SUB_SELECT_QUERY_REF_T1 + "." +
variableExpressionExecutor.getAttribute().getName() +
SQL_AS + attributeList.get(i).getName());
subSelectT1ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName());
innerSelectT2ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName());
} else {
subSelectT2ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName());
outerSelectColumnJoiner.add(SUB_SELECT_QUERY_REF_T2 + "." +
variableExpressionExecutor.getAttribute().getName() +
SQL_AS + attributeList.get(i).getName());
Expand All @@ -1274,10 +1292,14 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
} else if (expressionExecutor instanceof MaxAttributeAggregatorExecutor) {
if (attributeList.get(i).getName().equals(AGG_LAST_TIMESTAMP_COL)) {
innerSelectT2ColumnJoiner.add(dbAggregationSelectFunctionTemplates.getMaxFunction().
replace(PLACEHOLDER_COLUMN, attributeList.get(i).getName()));
subSelectT2ColumnJoiner.add(attributeList.get(i).getName());
replace(PLACEHOLDER_COLUMN, attributeList.get(i).getName()) + SQL_AS + attributeList.get(i).getName());
subSelectT2ColumnJoiner.add(INNER_SELECT_QUERY_REF_T3 + "." + attributeList.get(i).getName() +
SQL_AS + attributeList.get(i).getName());
outerSelectColumnJoiner.add(SUB_SELECT_QUERY_REF_T2 + "." + attributeList.get(i).getName() +
SQL_AS + attributeList.get(i).getName());
subSelectT2OnConditionBuilder.add(parentAggregationTable.getTableDefinition().getId() + "." +
attributeList.get(i).getName() + EQUALS + INNER_SELECT_QUERY_REF_T3 + "." +
attributeList.get(i).getName());
} else {
outerSelectColumnJoiner.add(SUB_SELECT_QUERY_REF_T1 + "." + attributeList.get(i).getName() + SQL_AS +
attributeList.get(i).getName());
Expand Down Expand Up @@ -1320,13 +1342,15 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
subQueryT2.add(dbAggregationSelectQueryTemplate.getSelectQueryWithInnerSelect().
replace(PLACEHOLDER_SELECTORS, subSelectT2ColumnJoiner.toString()).
replace(PLACEHOLDER_TABLE_NAME, parentAggregationTable.getTableDefinition().getId()).
replace(PLACEHOLDER_COLUMN, AGG_LAST_TIMESTAMP_COL).
replace(PLACEHOLDER_INNER_QUERY, innerT2Query.toString()));
replace(PLACEHOLDER_INNER_QUERY_2, innerT2Query.toString()).
replace(PLACEHOLDER_ON_CONDITION, subSelectT2OnConditionBuilder.toString()).
replace(PLACEHOLDER_CONDITION, innerT2WhereCondition));


finalSelectQuery.add(dbAggregationSelectQueryTemplate.getJoinClause().
replace(PLACEHOLDER_SELECTORS, outerSelectColumnJoiner.toString()).
replace(PLACEHOLDER_FROM_CONDITION, subQueryT1.toString()).
replace(PLACEHOLDER_INNER_QUERY, subQueryT2.toString()).
replace(PLACEHOLDER_INNER_QUERY_1, subQueryT2.toString()).
replace(PLACEHOLDER_CONDITION, onConditionBuilder.toString()));

completeQuery.add(insertIntoQueryBuilder.toString()).add(finalSelectQuery.toString());
Expand Down Expand Up @@ -1389,7 +1413,7 @@ private static Database getDatabaseType(ConfigManager configManager, String data
return Database.MYSQL;
} else if (databaseType.contains("oracle")) {
return Database.ORACLE;
} else if (databaseType.contains("mssql")) {
} else if (databaseType.contains("mssql") || databaseType.contains("sqlserver")) {
return Database.MSSQL;
} else if (databaseType.contains("postgres")) {
return Database.PostgreSQL;
Expand Down
39 changes: 26 additions & 13 deletions modules/siddhi-core/src/main/resources/db-aggregation-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
<recordInsertQuery>INSERT INTO {{TABLE_NAME}} ({{COLUMNS}})</recordInsertQuery>
<selectClause>SELECT {{SELECTORS}} FROM {{TABLE_NAME}}</selectClause>
<whereClause>WHERE {{CONDITION}}</whereClause>
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} WHERE {{COLUMN}} IN ({{INNER_QUERY}})
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} LEFT JOIN ({{INNER_QUERY_2}}) AS t3 ON
{{ON_CONDITION}} WHERE {{CONDITION}}
</selectQueryWithInnerSelect>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY}}) AS t2 ON {{CONDITION}}</joinClause>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY_1}}) AS t2 ON
{{CONDITION}}
</joinClause>
<groupByClause>GROUP BY {{COLUMNS}}</groupByClause>
</selectQueryTemplate>
<selectQueryFunctions>
<sumFunction>SUM({{COLUMN}})</sumFunction>
<countFunction>COUNT({{COLUMN}})</countFunction>
<maxFunction>MAX({{COLUMN}})</maxFunction>
<minFunction>MIN({{COLUMN}})</minFunction>
<timeConversionFunction>UNIX_TIMESTAMP(from_unixtime({{COLUMN}}/1000,"{{DURATION}}"))*1000</timeConversionFunction>
<timeConversionFunction>UNIX_TIMESTAMP(from_unixtime({{COLUMN}}/1000,"{{DURATION}}"))*1000
</timeConversionFunction>
</selectQueryFunctions>
<timeConversionDurationMapping>
<day>%Y-%m-%d</day>
Expand All @@ -41,9 +45,12 @@
<recordInsertQuery>INSERT INTO {{TABLE_NAME}} ({{COLUMNS}})</recordInsertQuery>
<selectClause>SELECT {{SELECTORS}} FROM {{TABLE_NAME}}</selectClause>
<whereClause>WHERE {{CONDITION}}</whereClause>
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} WHERE {{COLUMN}} IN ({{INNER_QUERY}})
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} LEFT JOIN ({{INNER_QUERY_2}}) t3 ON
{{ON_CONDITION}} WHERE {{CONDITION}}
</selectQueryWithInnerSelect>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) t1 JOIN ({{INNER_QUERY}}) t2 ON {{CONDITION}}</joinClause>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) t1 JOIN ({{INNER_QUERY_1}}) t2 ON
{{CONDITION}}
</joinClause>
<groupByClause>GROUP BY {{COLUMNS}}</groupByClause>
</selectQueryTemplate>
<selectQueryFunctions>
Expand Down Expand Up @@ -75,22 +82,25 @@
<bigStringType>CLOB</bigStringType>
</typeMapping>
</database>
<database name="Microsoft SQL Server">
<database name="MSSQL">
<selectQueryTemplate>
<recordInsertQuery>INSERT INTO {{TABLE_NAME}} ({{COLUMNS}})</recordInsertQuery>
<selectClause>SELECT {{SELECTORS}} FROM {{TABLE_NAME}}</selectClause>
<whereClause>WHERE {{CONDITION}}</whereClause>
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} WHERE {{COLUMN}} IN ({{INNER_QUERY}})
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} LEFT JOIN ({{INNER_QUERY_2}}) AS t3 ON
{{ON_CONDITION}} WHERE {{CONDITION}}
</selectQueryWithInnerSelect>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY}}) AS t2 ON {{CONDITION}}</joinClause>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY_1}}) AS t2 ON
{{CONDITION}}
</joinClause>
<groupByClause>GROUP BY {{COLUMNS}}</groupByClause>
</selectQueryTemplate>
<selectQueryFunctions>
<sumFunction>SUM({{COLUMN}})</sumFunction>
<countFunction>COUNT({{COLUMN}})</countFunction>
<maxFunction>MAX({{COLUMN}})</maxFunction>
<minFunction>MIN({{COLUMN}})</minFunction>
<timeConversionFunction>DATEDIFF(second , '19700101',DATEADD({{DURATION}} , DATEDIFF({{DURATION}} , 0, DATEADD(ss, {{COLUMN}}/1000, '19700101')), 0))*1000</timeConversionFunction>
<timeConversionFunction>CAST(DATEDIFF(second , '19700101',DATEADD({{DURATION}} , DATEDIFF({{DURATION}} , 0, DATEADD(ss, {{COLUMN}}/1000, '19700101')), 0)) AS bigint)*1000</timeConversionFunction>
</selectQueryFunctions>
<timeConversionDurationMapping>
<day>dd</day>
Expand All @@ -116,17 +126,20 @@
<recordInsertQuery>INSERT INTO {{TABLE_NAME}} ({{COLUMNS}})</recordInsertQuery>
<selectClause>SELECT {{SELECTORS}} FROM {{TABLE_NAME}}</selectClause>
<whereClause>WHERE {{CONDITION}}</whereClause>
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} WHERE {{COLUMN}} IN ({{INNER_QUERY}})
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} LEFT JOIN ({{INNER_QUERY_2}}) AS t3 ON
{{ON_CONDITION}} WHERE {{CONDITION}}
</selectQueryWithInnerSelect>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY}}) AS t2 ON {{CONDITION}}</joinClause>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY_1}}) AS t2 ON
{{CONDITION}}
</joinClause>
<groupByClause>GROUP BY {{COLUMNS}}</groupByClause>
</selectQueryTemplate>
<selectQueryFunctions>
<sumFunction>SUM({{COLUMN}})</sumFunction>
<countFunction>COUNT({{COLUMN}})</countFunction>
<maxFunction>MAX({{COLUMN}})</maxFunction>
<minFunction>MIN({{COLUMN}})</minFunction>
<timeConversionFunction>EXTRACT(epoch from date_trunc('{{DURATION}}', to_timestamp({{COLUMN}}/1000)))*1000;
<timeConversionFunction>EXTRACT(epoch from date_trunc('{{DURATION}}', to_timestamp({{COLUMN}}/1000)))*1000;
</timeConversionFunction>
</selectQueryFunctions>
<timeConversionDurationMapping>
Expand All @@ -147,4 +160,4 @@
<stringType>VARCHAR</stringType>
</typeMapping>
</database>
</rdbms-table-configuration>
</rdbms-table-configuration>