From 9b1313230a9eb4292fbd3b83dbac6fa029b6a610 Mon Sep 17 00:00:00 2001 From: Philipp Zehnder Date: Fri, 3 Jan 2025 17:50:15 +0100 Subject: [PATCH] refactor(#3390): Remove asset links when deleting a measurement --- .../api/IDataExplorerSchemaManagement.java | 2 + .../DataExplorerSchemaManagement.java | 5 + .../DataExplorerSchemaManagementTest.java | 50 +++++-- streampipes-platform-services/pom.xml | 5 + .../streampipes/ps/DataLakeResourceV4.java | 135 ++++++++++++------ 5 files changed, 139 insertions(+), 58 deletions(-) diff --git a/streampipes-data-explorer-api/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java b/streampipes-data-explorer-api/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java index a210b515b3..b6c4138ff0 100644 --- a/streampipes-data-explorer-api/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java +++ b/streampipes-data-explorer-api/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java @@ -28,6 +28,8 @@ public interface IDataExplorerSchemaManagement { DataLakeMeasure getById(String elementId); + DataLakeMeasure getByMeasureName(String measureName); + DataLakeMeasure createOrUpdateMeasurement(DataLakeMeasure measure); void deleteMeasurement(String elementId); diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java index ef3db4c7d3..ad90fb8664 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java @@ -50,6 +50,11 @@ public DataLakeMeasure getById(String elementId) { return dataLakeStorage.getElementById(elementId); } + @Override + public DataLakeMeasure getByMeasureName(String measureName) { + return this.getExistingMeasureByName(measureName).orElse(null); + } + /** * For new measurements an entry is generated in the database. For existing measurements the schema is updated * according to the update strategy defined by the measurement. diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagementTest.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagementTest.java index 4dc34531ce..52ded2f550 100644 --- a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagementTest.java +++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagementTest.java @@ -35,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -47,22 +48,49 @@ public class DataExplorerSchemaManagementTest { public static final String OLD_PROPERTY = "oldProperty"; private CRUDStorage dataLakeStorageMock; + private DataExplorerSchemaManagement dataExplorerSchemaManagement; @BeforeEach public void setUp() { dataLakeStorageMock = mock(CRUDStorage.class); + dataExplorerSchemaManagement = new DataExplorerSchemaManagement(dataLakeStorageMock); } + + @Test + public void getByMeasureName_ReturnsMeasureWhenMeasureExists() { + var measureName = "existingMeasure"; + var existingMeasure = new DataLakeMeasure(); + existingMeasure.setMeasureName(measureName); + + when(dataLakeStorageMock.findAll()).thenReturn(List.of(existingMeasure)); + + var result = dataExplorerSchemaManagement.getByMeasureName(measureName); + + assertEquals(existingMeasure, result); + } + + @Test + public void getByMeasureName_ReturnsNullWhenMeasureDoesNotExist() { + var measureName = "nonExistingMeasure"; + + when(dataLakeStorageMock.findAll()).thenReturn(List.of()); + + var result = dataExplorerSchemaManagement.getByMeasureName(measureName); + + assertNull(result); + } + + @Test - public void createMeasurementThatNotExisted() { + public void createOrUpdateMeasuremente_ThatNotExisted() { when(dataLakeStorageMock.findAll()).thenReturn(List.of()); - var schemaManagement = new DataExplorerSchemaManagement(dataLakeStorageMock); var oldMeasure = getSampleMeasure( DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA, List.of() ); - var resultingMeasure = schemaManagement.createOrUpdateMeasurement(oldMeasure); + var resultingMeasure = dataExplorerSchemaManagement.createOrUpdateMeasurement(oldMeasure); assertEquals(oldMeasure.getMeasureName(), resultingMeasure.getMeasureName()); verify(dataLakeStorageMock, Mockito.times(1)) @@ -71,7 +99,7 @@ public void createMeasurementThatNotExisted() { @Test - public void createMeasurementWithUpdateStrategy() { + public void createOrUpdateMeasuremente_WithUpdateStrategy() { var oldMeasure = getSampleMeasure( DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA, @@ -82,11 +110,10 @@ public void createMeasurementWithUpdateStrategy() { when(dataLakeStorageMock.findAll()).thenReturn(List.of(oldMeasure)); when(dataLakeStorageMock.getElementById(any())).thenReturn(oldMeasure); - var schemaManagement = new DataExplorerSchemaManagement(dataLakeStorageMock); var newMeasure = getNewMeasure(DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA); - var resultMeasure = schemaManagement.createOrUpdateMeasurement(newMeasure); + var resultMeasure = dataExplorerSchemaManagement.createOrUpdateMeasurement(newMeasure); assertEquals(newMeasure.getMeasureName(), resultMeasure.getMeasureName()); verify(dataLakeStorageMock, Mockito.times(1)) @@ -98,7 +125,7 @@ public void createMeasurementWithUpdateStrategy() { @Test - public void createMeasurementWithExtendSchemaStrategy() { + public void createOrUpdateMeasuremente_WithExtendSchemaStrategy() { var oldMeasure = getSampleMeasure( DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA, @@ -108,10 +135,9 @@ public void createMeasurementWithExtendSchemaStrategy() { ); when(dataLakeStorageMock.findAll()).thenReturn(List.of(oldMeasure)); when(dataLakeStorageMock.getElementById(any())).thenReturn(oldMeasure); - var schemaManagement = new DataExplorerSchemaManagement(dataLakeStorageMock); var newMeasure = getNewMeasure(DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA); - var resultMeasure = schemaManagement.createOrUpdateMeasurement(newMeasure); + var resultMeasure = dataExplorerSchemaManagement.createOrUpdateMeasurement(newMeasure); assertEquals(newMeasure.getMeasureName(), resultMeasure.getMeasureName()); verify(dataLakeStorageMock, Mockito.times(1)).updateElement(any()); @@ -121,7 +147,7 @@ public void createMeasurementWithExtendSchemaStrategy() { @Test - public void createMeasurementWithExtendSchemaStrategyAndDifferentPropertyTypes() { + public void createOrUpdateMeasuremente_WithExtendSchemaStrategyAndDifferentPropertyTypes() { var oldMeasure = getSampleMeasure( DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA, List.of( @@ -133,11 +159,9 @@ public void createMeasurementWithExtendSchemaStrategyAndDifferentPropertyTypes() when(dataLakeStorageMock.findAll()).thenReturn(List.of(oldMeasure)); when(dataLakeStorageMock.getElementById(any())).thenReturn(oldMeasure); - var schemaManagement = new DataExplorerSchemaManagement(dataLakeStorageMock); - var newMeasure = getNewMeasure(DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA); - var resultMeasure = schemaManagement.createOrUpdateMeasurement(newMeasure); + var resultMeasure = dataExplorerSchemaManagement.createOrUpdateMeasurement(newMeasure); assertEquals(newMeasure.getMeasureName(), resultMeasure.getMeasureName()); verify(dataLakeStorageMock, Mockito.times(1)).updateElement(any()); assertEquals( diff --git a/streampipes-platform-services/pom.xml b/streampipes-platform-services/pom.xml index c771c9fc65..b7722b2276 100644 --- a/streampipes-platform-services/pom.xml +++ b/streampipes-platform-services/pom.xml @@ -28,6 +28,11 @@ streampipes-platform-services + + org.apache.streampipes + streampipes-asset-model-management + 0.98.0-SNAPSHOT + org.apache.streampipes streampipes-data-explorer-export diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java index d4bda50882..6953a10836 100644 --- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java +++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java @@ -18,6 +18,7 @@ package org.apache.streampipes.ps; +import org.apache.streampipes.assetmodel.management.AssetModelHelper; import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement; import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement; import org.apache.streampipes.dataexplorer.export.OutputFormat; @@ -38,6 +39,8 @@ import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -52,6 +55,7 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -83,6 +87,8 @@ public class DataLakeResourceV4 extends AbstractRestResource { private final IDataExplorerQueryManagement dataExplorerQueryManagement; private final IDataExplorerSchemaManagement dataExplorerSchemaManagement; + private static final Logger LOG = LoggerFactory.getLogger(DataLakeResourceV4.class); + public DataLakeResourceV4() { this.dataExplorerSchemaManagement = new DataExplorerDispatcher() .getDataExplorerManager() @@ -101,18 +107,20 @@ public DataLakeResourceV4(IDataExplorerQueryManagement dataExplorerQueryManageme @DeleteMapping(path = "/measurements/{measurementID}") @Operation(summary = "Remove data from a single measurement series with given id", tags = {"Data Lake"}, - responses = { - @ApiResponse(responseCode = "200", description = "Data from measurement series successfully removed"), - @ApiResponse(responseCode = "400", description = "Measurement series with given id not found")}) + responses = { + @ApiResponse(responseCode = "200", description = "Data from measurement series successfully removed"), + @ApiResponse(responseCode = "400", description = "Measurement series with given id not found") + }) public ResponseEntity deleteData( @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathVariable("measurementID") String measurementID , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @RequestParam(value = "startDate", required = false) Long startDate , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") - @RequestParam(value = "endDate", required = false) Long endDate) { + @RequestParam(value = "endDate", required = false) Long endDate + ) { - if (this.dataExplorerQueryManagement.deleteData(measurementID, startDate, endDate)){ + if (this.dataExplorerQueryManagement.deleteData(measurementID, startDate, endDate)) { return ok(); } else { return ResponseEntity @@ -124,23 +132,29 @@ public ResponseEntity deleteData( @DeleteMapping(path = "/measurements/{measurementID}/drop") @Operation(summary = "Drop a single measurement series with given id from Data Lake and " + "remove related event property", - tags = { - "Data Lake"}, - responses = { - @ApiResponse( - responseCode = "200", - description = "Measurement series successfully dropped from Data Lake"), - @ApiResponse( - responseCode = "400", - description = "Measurement series with given id or related event property not found")}) + tags = { + "Data Lake" + }, + responses = { + @ApiResponse( + responseCode = "200", + description = "Measurement series successfully dropped from Data Lake"), + @ApiResponse( + responseCode = "400", + description = "Measurement series with given id or related event property not found") + }) public ResponseEntity dropMeasurementSeries( @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) - @PathVariable("measurementID") String measurementID) { + @PathVariable("measurementID") String measureName + ) { + // Note: The measuremendID is not the measurmentId here, it is the measureName. See Issue #3400 + + removeMeasurementFromAllAssetModels(measureName); - boolean isSuccessDataLake = this.dataExplorerQueryManagement.deleteData(measurementID); + boolean isSuccessDataLake = this.dataExplorerQueryManagement.deleteData(measureName); if (isSuccessDataLake) { - boolean isSuccessEventProperty = this.dataExplorerSchemaManagement.deleteMeasurementByName(measurementID); + boolean isSuccessEventProperty = this.dataExplorerSchemaManagement.deleteMeasurementByName(measureName); if (isSuccessEventProperty) { return ok(); } else { @@ -155,21 +169,44 @@ public ResponseEntity dropMeasurementSeries( } } + /** + * This method removes the asset link from all asset models that are linked to the given measurement + * @param measureName of the measurement to be removed from the asset models + */ + private void removeMeasurementFromAllAssetModels(String measureName) { + try { + var assetModelHelper = new AssetModelHelper(); + + var measureToDelete = this.dataExplorerSchemaManagement.getByMeasureName(measureName); + if (measureToDelete != null) { + assetModelHelper.removeAssetLinkFromAllAssets(measureToDelete.getElementId()); + } else { + LOG.error("Measue with measureName {} not found", measureName); + } + + } catch (IOException e) { + LOG.error("Could not remove asset link from measurement series: {}", measureName, e); + } + } + @GetMapping(path = "/measurements", produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Get a list of all measurement series", tags = {"Data Lake"}, - responses = { - @ApiResponse( - responseCode = "200", - description = "array of stored measurement series", - content = @Content(array = @ArraySchema(schema = @Schema(implementation = DataLakeMeasure.class))))}) + responses = { + @ApiResponse( + responseCode = "200", + description = "array of stored measurement series", + content = @Content(array = @ArraySchema(schema = @Schema(implementation = DataLakeMeasure.class)))) + }) public ResponseEntity> getAll() { List allMeasurements = this.dataExplorerSchemaManagement.getAllMeasurements(); return ok(allMeasurements); } @GetMapping(path = "/measurements/{measurementId}/tags", produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity> getTagValues(@PathVariable("measurementId") String measurementId, - @RequestParam("fields") String fields) { + public ResponseEntity> getTagValues( + @PathVariable("measurementId") String measurementId, + @RequestParam("fields") String fields + ) { Map tagValues = dataExplorerQueryManagement.getTagValues(measurementId, fields); return ok(tagValues); } @@ -177,13 +214,15 @@ public ResponseEntity> getTagValues(@PathVariable("measureme @GetMapping(path = "/measurements/{measurementID}", produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Get data from a single measurement series by a given id", tags = {"Data Lake"}, - responses = { - @ApiResponse( - responseCode = "400", - description = "Measurement series with given id and requested query specification not found"), - @ApiResponse( - responseCode = "200", - description = "requested data", content = @Content(schema = @Schema(implementation = DataSeries.class)))}) + responses = { + @ApiResponse( + responseCode = "400", + description = "Measurement series with given id and requested query specification not found"), + @ApiResponse( + responseCode = "200", + description = "requested data", + content = @Content(schema = @Schema(implementation = DataSeries.class))) + }) public ResponseEntity getData( @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathVariable("measurementID") String measurementID @@ -232,7 +271,8 @@ public ResponseEntity getData( description = "the maximum amount of resulting events," + "when too high the query status is set to TOO_MUCH_DATA") @RequestParam(value = QP_MAXIMUM_AMOUNT_OF_EVENTS, required = false) Integer maximumAmountOfResults, - @RequestParam Map queryParams) { + @RequestParam Map queryParams + ) { if (!(checkProvidedQueryParams(queryParams))) { return badRequest(); @@ -264,13 +304,15 @@ public ResponseEntity> getData(@RequestBody List downloadData( @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathVariable("measurementID") String measurementID @@ -315,7 +357,8 @@ public ResponseEntity downloadData( description = "filter conditions (a comma-separated list of filter conditions" + "such as [field,operator,condition])") @RequestParam(value = QP_FILTER, required = false) String filter, - @RequestParam Map queryParams) { + @RequestParam Map queryParams + ) { if (!(checkProvidedQueryParams(queryParams))) { @@ -331,22 +374,24 @@ public ResponseEntity downloadData( sanitizedParams, outputFormat, isIgnoreMissingValues(missingValueBehaviour), - output); + output + ); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_OCTET_STREAM); headers.setContentDispositionFormData("attachment", "datalake." + outputFormat); return ResponseEntity.ok() - .headers(headers) - .body(streamingOutput); + .headers(headers) + .body(streamingOutput); } } @DeleteMapping(path = "/measurements") @Operation(summary = "Remove all stored measurement series from Data Lake", tags = {"Data Lake"}, - responses = { - @ApiResponse(responseCode = "200", description = "All measurement series successfully removed")}) + responses = { + @ApiResponse(responseCode = "200", description = "All measurement series successfully removed") + }) public ResponseEntity removeAll() { boolean isSuccess = this.dataExplorerQueryManagement.deleteAllData(); return ResponseEntity.ok(isSuccess);