Skip to content

Commit

Permalink
Merge pull request #116 from apache/STREAMPIPES-583
Browse files Browse the repository at this point in the history
Streampipes 583
  • Loading branch information
tenthe authored Oct 18, 2022
2 parents c985769 + 2ead922 commit 233cd8c
Show file tree
Hide file tree
Showing 63 changed files with 1,668 additions and 539 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cypress-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
run: sleep 70

- name: 'UI Tests'
uses: cypress-io/github-action@v2
uses: cypress-io/github-action@v4
with:
install: false
wait-on: 'http://localhost/#/login'
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

# test dependencies
ui/cypress/tests/experimental/testJvmArchetype/automated-test
ui/cypress/downloads

## File-based project format:
*.iws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAccessor;
import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.*;
Expand All @@ -82,26 +87,29 @@ public DataLakeMeasure getById(String measureId) {
return getDataLakeStorage().findOne(measureId);
}

public SpQueryResult getData(ProvidedQueryParams queryParams) throws IllegalArgumentException {
public SpQueryResult getData(ProvidedQueryParams queryParams, boolean ignoreMissingData) throws IllegalArgumentException {
if (queryParams.has(QP_AUTO_AGGREGATE)) {
queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
}
Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getSelectQueryParams(queryParams);

if (queryParams.getProvidedParams().containsKey(QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(QP_MAXIMUM_AMOUNT_OF_EVENTS));
return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery();
return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery(ignoreMissingData);
}

if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery();
return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery(ignoreMissingData);
} else {
return new DataExplorerQueryV4(queryParts).executeQuery();
return new DataExplorerQueryV4(queryParts).executeQuery(ignoreMissingData);
}
}

public void getDataAsStream(ProvidedQueryParams params, String format, OutputStream outputStream) throws IOException {
public void getDataAsStream(ProvidedQueryParams params,
String format,
boolean ignoreMissingValues,
OutputStream outputStream) throws IOException {
if (!params.has(QP_LIMIT)) {
params.update(QP_LIMIT, 500000);
}
Expand All @@ -121,7 +129,7 @@ public void getDataAsStream(ProvidedQueryParams params, String format, OutputStr
outputStream.write(toBytes("["));
do {
params.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(i));
dataResult = getData(params);
dataResult = getData(params, ignoreMissingValues);

if (dataResult.getTotal() > 0) {
for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
Expand Down Expand Up @@ -170,7 +178,7 @@ public void getDataAsStream(ProvidedQueryParams params, String format, OutputStr

do {
params.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(i));
dataResult = getData(params);
dataResult = getData(params, ignoreMissingValues);
//Send first header
if (dataResult.getTotal() > 0) {
if (isFirstDataObject) {
Expand Down Expand Up @@ -243,7 +251,7 @@ public SpQueryResult deleteData(String measurementID) {

public SpQueryResult deleteData(String measurementID, Long startDate, Long endDate) {
Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getDeleteQueryParams(measurementID, startDate, endDate);
return new DataExplorerQueryV4(queryParts).executeQuery();
return new DataExplorerQueryV4(queryParts).executeQuery(true);
}

public DataLakeConfiguration getDataLakeConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ public Integer getCount(String fieldName) {
countParams.update(QP_COUNT_ONLY, true);
countParams.update(QP_COLUMNS, fieldName);

SpQueryResult result = new DataLakeManagementV4().getData(countParams);
SpQueryResult result = new DataLakeManagementV4().getData(countParams, true);

return result.getTotal() > 0 ? ((Double) result.getAllDataSeries().get(0).getRows().get(0).get(1)).intValue() : 0;
}

private SpQueryResult fireQuery(ProvidedQueryParams params) {
return dataLakeManagement.getData(params);
return dataLakeManagement.getData(params, true);
}

private int getAggregationValue(SpQueryResult newest, SpQueryResult oldest) throws ParseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class SupportedDataLakeQueryParameters {
public static final String QP_TIME_INTERVAL = "timeInterval";
public static final String QP_FORMAT = "format";
public static final String QP_CSV_DELIMITER = "delimiter";

public static final String QP_MISSING_VALUE_BEHAVIOUR = "missingValueBehaviour";
public static final String QP_COUNT_ONLY = "countOnly";
public static final String QP_AUTO_AGGREGATE = "autoAggregate";
public static final String QP_FILTER = "filter";
Expand All @@ -54,6 +56,7 @@ public class SupportedDataLakeQueryParameters {
QP_CSV_DELIMITER,
QP_COUNT_ONLY,
QP_AUTO_AGGREGATE,
QP_MISSING_VALUE_BEHAVIOUR,
QP_FILTER,
QP_MAXIMUM_AMOUNT_OF_EVENTS
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public DataExplorerQueryV4(Map<String, QueryParamsV4> params, int maximumAmountO
this.maximumAmountOfEvents = maximumAmountOfEvents;
}

public SpQueryResult executeQuery() throws RuntimeException {
public SpQueryResult executeQuery(boolean ignoreMissingValues) throws RuntimeException {
InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
List<QueryElement<?>> queryElements = getQueryElements();

Expand All @@ -93,53 +93,60 @@ public SpQueryResult executeQuery() throws RuntimeException {
QueryResult result = influxDB.query(query);
LOG.debug("Data Lake Query Result: " + result.toString());

SpQueryResult dataResult = postQuery(result);
SpQueryResult dataResult = postQuery(result, ignoreMissingValues);

influxDB.close();
return dataResult;
}

public SpQueryResult executeQuery(Query query) {
public SpQueryResult executeQuery(Query query, boolean ignoreMissingValues) {
InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
QueryResult result = influxDB.query(query);
SpQueryResult dataResult = postQuery(result);
SpQueryResult dataResult = postQuery(result, ignoreMissingValues);
influxDB.close();

return dataResult;
}

private double getAmountOfResults(QueryResult countQueryResult) {
if (countQueryResult.getResults().get(0).getSeries() != null &&
countQueryResult.getResults().get(0).getSeries().get(0).getValues() != null) {
if (countQueryResult.getResults().get(0).getSeries() != null
&& countQueryResult.getResults().get(0).getSeries().get(0).getValues() != null) {
return (double) countQueryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(1);
} else {
return 0.0;
}
}


protected DataSeries convertResult(QueryResult.Series series) {
protected DataSeries convertResult(QueryResult.Series series,
boolean ignoreMissingValues) {
List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues();

List<List<Object>> resultingValues = new ArrayList<>();

values.forEach(v -> {
if (!v.contains(null)) {
if (ignoreMissingValues) {
if (!v.contains(null)) {
resultingValues.add(v);
}
} else {
resultingValues.add(v);
}

});

return new DataSeries(values.size(), resultingValues, columns, series.getTags());
}

protected SpQueryResult postQuery(QueryResult queryResult) throws RuntimeException {
protected SpQueryResult postQuery(QueryResult queryResult,
boolean ignoreMissingValues) throws RuntimeException {
SpQueryResult result = new SpQueryResult();

if (queryResult.getResults().get(0).getSeries() != null) {
result.setTotal(queryResult.getResults().get(0).getSeries().size());
queryResult.getResults().get(0).getSeries().forEach(rs -> {
DataSeries series = convertResult(rs);
DataSeries series = convertResult(rs, ignoreMissingValues);
result.setHeaders(series.getHeaders());
result.addDataResult(series);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Response getData(@Parameter(in = ParameterIn.PATH, description = "the id
} else {
ProvidedQueryParams sanitizedParams = populate(measurementID, queryParams);
try {
SpQueryResult result = this.dataLakeManagement.getData(sanitizedParams);
SpQueryResult result = this.dataLakeManagement.getData(sanitizedParams, true);
return ok(result);
} catch (RuntimeException e) {
return badRequest(StreamPipesErrorMessage.from(e));
Expand All @@ -175,7 +175,7 @@ public Response getData(List<Map<String, String>> queryParams) {
var results = queryParams
.stream()
.map(qp -> new ProvidedQueryParams(qp.get("measureName"), qp))
.map(params -> this.dataLakeManagement.getData(params))
.map(params -> this.dataLakeManagement.getData(params, true))
.collect(Collectors.toList());

return ok(results);
Expand All @@ -201,20 +201,33 @@ public Response downloadData(@Parameter(in = ParameterIn.PATH, description = "th
, @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam(QP_TIME_INTERVAL) String timeInterval
, @Parameter(in = ParameterIn.QUERY, description = "format specification (csv, json - default is csv) for data download") @QueryParam(QP_FORMAT) String format
, @Parameter(in = ParameterIn.QUERY, description = "csv delimiter (comma or semicolon)") @QueryParam(QP_CSV_DELIMITER) String csvDelimiter
, @Parameter(in = ParameterIn.QUERY, description = "missingValueBehaviour (ignore or empty)") @QueryParam(QP_MISSING_VALUE_BEHAVIOUR) String missingValueBehaviour
, @Parameter(in = ParameterIn.QUERY, description = "filter conditions (a comma-separated list of filter conditions such as [field,operator,condition])") @QueryParam(QP_FILTER) String filter
, @Context UriInfo uriInfo) {

MultivaluedMap<String, String> queryParams = uriInfo.getQueryParameters();

if (! (checkProvidedQueryParams(queryParams))) {
if (!(checkProvidedQueryParams(queryParams))) {
return badRequest();
} else {
ProvidedQueryParams sanitizedParams = populate(measurementID, queryParams);
if (format == null) {
format = "csv";
}

boolean ignoreMissingValues;
if ("ignore".equals(missingValueBehaviour)) {
ignoreMissingValues = true;
} else {
ignoreMissingValues = false;
}

String outputFormat = format;
StreamingOutput streamingOutput = output -> dataLakeManagement.getDataAsStream(sanitizedParams, outputFormat, output);
StreamingOutput streamingOutput = output -> dataLakeManagement.getDataAsStream(
sanitizedParams,
outputFormat,
ignoreMissingValues,
output);

return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
header("Content-Disposition", "attachment; filename=\"datalake." + outputFormat + "\"")
Expand Down
6 changes: 6 additions & 0 deletions ui/angular.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"root": "",
"sourceRoot": "src",
"projectType": "application",
"prefix": "sp",
"architect": {
"build": {
"builder": "@angular-builders/custom-webpack:browser",
Expand Down Expand Up @@ -173,5 +174,10 @@
}
}
},
"schematics": {
"@schematics/angular:component": {
"style": "scss"
}
},
"defaultProject": "app"
}
27 changes: 27 additions & 0 deletions ui/cypress.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// tslint:disable-next-line:no-implicit-dependencies
import { defineConfig } from 'cypress';

export default defineConfig({
projectId: 'q1jdu2',
downloadsFolder: 'cypress/downloads',
env: {
TAKE_SCREENSHOT: 'false',
},
retries: {
runMode: 1,
openMode: 0,
},
trashAssetsBeforeRuns: true,
videoCompression: false,
viewportWidth: 1920,
viewportHeight: 1080,
e2e: {
// We've imported your old cypress plugins here.
// You may want to clean this up later by importing these.
setupNodeEvents(on, config) {
return require('./cypress/plugins/index.ts')(on, config);
},
specPattern: 'cypress/tests/**/*.{js,jsx,ts,tsx}',
baseUrl: 'http://localhost:80',
},
});
16 changes: 0 additions & 16 deletions ui/cypress.json

This file was deleted.

2 changes: 0 additions & 2 deletions ui/cypress/downloads/sp_adaptertotestschemarules.csv

This file was deleted.

5 changes: 5 additions & 0 deletions ui/cypress/fixtures/dataDownloadDialog/input.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[{"timestamp": 1623871499055, "v1": 4.1, "v2": "abc", "v3": true, "v4": 1},
{"timestamp": 1623871500059, "v1": 4.2, "v2": "abc", "v3": false, "v4": 2},
{"timestamp": 1623871507091, "v1": 4.3, "v4": 3},
{"timestamp": 1623871508093, "v1": 4.4, "v2": "abc", "v3": true, "v4": 4},
{"timestamp": 1623871508095, "v1": 4.5, "v4": 5}]
6 changes: 6 additions & 0 deletions ui/cypress/fixtures/dataDownloadDialog/testCsvComma.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
time,v1,v2,v3,v4
1623871499055,4.099999904632568,abc,true,1.0
1623871500059,4.199999809265137,abc,false,2.0
1623871507091,4.300000190734863,,,3.0
1623871508093,4.400000095367432,abc,true,4.0
1623871508095,4.5,,,5.0
6 changes: 6 additions & 0 deletions ui/cypress/fixtures/dataDownloadDialog/testCsvSemicolon.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
time;v1;v2;v3;v4
1623871499055;4.099999904632568;abc;true;1.0
1623871500059;4.199999809265137;abc;false;2.0
1623871507091;4.300000190734863;;;3.0
1623871508093;4.400000095367432;abc;true;4.0
1623871508095;4.5;;;5.0
1 change: 1 addition & 0 deletions ui/cypress/fixtures/dataDownloadDialog/testJson.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"time": 1623871499055,"v1": 4.099999904632568,"v2": "abc","v3": true,"v4": 1.0},{"time": 1623871500059,"v1": 4.199999809265137,"v2": "abc","v3": false,"v4": 2.0},{"time": 1623871507091,"v1": 4.300000190734863,"v2": null,"v3": null,"v4": 3.0},{"time": 1623871508093,"v1": 4.400000095367432,"v2": "abc","v3": true,"v4": 4.0},{"time": 1623871508095,"v1": 4.5,"v2": null,"v3": null,"v4": 5.0}]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
time;v1;v2;v3;v4
1623871499055;4.099999904632568;abc;true;1.0
1623871500059;4.199999809265137;abc;false;2.0
1623871508093;4.400000095367432;abc;true;4.0
Loading

0 comments on commit 233cd8c

Please sign in to comment.