From a581707ad0100816390ecb98443d87b769d4326b Mon Sep 17 00:00:00 2001 From: Philipp Zehnder Date: Thu, 15 Dec 2022 09:00:27 +0100 Subject: [PATCH] [#820] Activate checkstyle for streampipes-pipeline-management --- streampipes-pipeline-management/pom.xml | 9 + .../manager/assets/AssetExtractor.java | 4 +- .../manager/assets/AssetFetcher.java | 13 +- .../manager/assets/AssetManager.java | 5 +- .../manager/data/PipelineGraph.java | 7 +- .../manager/data/PipelineGraphBuilder.java | 78 ++++---- .../manager/data/PipelineGraphHelpers.java | 28 +-- .../manager/endpoint/EndpointItemFetcher.java | 56 +++--- .../manager/endpoint/EndpointItemParser.java | 14 +- .../ExtensionsServiceEndpointGenerator.java | 9 +- .../execution/http/GraphSubmitter.java | 10 +- .../execution/http/HttpRequestBuilder.java | 32 ++-- .../execution/http/PipelineExecutor.java | 85 +++++---- .../http/PipelineStorageService.java | 16 +- .../status/PipelineStatusManager.java | 66 ++++--- .../manager/file/FileConstants.java | 1 - .../streampipes/manager/file/FileHandler.java | 2 +- .../streampipes/manager/file/FileManager.java | 30 +-- .../PipelineElementEndpointHealthCheck.java | 3 +- .../manager/health/PipelineHealthCheck.java | 48 ++--- .../manager/info/SystemInfoProvider.java | 58 +++--- .../manager/info/VersionInfoProvider.java | 8 +- .../matching/ConnectionStorageHandler.java | 6 +- .../matching/DataSetGroundingSelector.java | 62 ++++--- .../manager/matching/FormatSelector.java | 68 +++---- .../manager/matching/GroundingBuilder.java | 44 ++--- .../manager/matching/GroundingSelector.java | 32 ++-- .../PipelineModificationGenerator.java | 25 ++- .../PipelineVerificationHandlerV2.java | 8 +- .../manager/matching/ProtocolSelector.java | 22 ++- .../EmptyRequirementsSelectorGenerator.java | 8 +- .../mapping/MappingPropertyCalculator.java | 2 +- .../RequirementsSelectorGenerator.java | 18 +- .../RequirementsSelectorGeneratorFactory.java | 11 +- .../output/AppendOutputSchemaGenerator.java | 16 +- .../output/CustomOutputSchemaGenerator.java | 8 +- .../CustomTransformOutputSchemaGenerator.java | 23 ++- .../output/FixedOutputSchemaGenerator.java | 2 +- .../output/ListOutputSchemaGenerator.java | 2 +- .../matching/output/OutputSchemaFactory.java | 10 +- .../output/OutputSchemaGenerator.java | 2 +- .../output/PropertyDuplicateRemover.java | 7 +- .../output/RenameOutputSchemaGenerator.java | 2 +- .../TransformOutputSchemaGenerator.java | 83 +++++---- .../UserDefinedOutputSchemaGenerator.java | 6 +- .../manager/matching/v2/AbstractMatcher.java | 50 ++--- .../manager/matching/v2/DatatypeMatch.java | 62 ++++--- .../matching/v2/DomainPropertyMatch.java | 51 +++-- .../matching/v2/ElementVerification.java | 52 +++--- .../manager/matching/v2/FormatMatch.java | 4 +- .../manager/matching/v2/GroundingMatch.java | 28 +-- .../matching/v2/ListPropertyMatch.java | 6 +- .../manager/matching/v2/Matcher.java | 8 +- .../matching/v2/MeasurementUnitMatch.java | 16 +- .../matching/v2/NestedPropertyMatch.java | 24 +-- .../matching/v2/PrimitivePropertyMatch.java | 66 +++---- .../manager/matching/v2/PropertyMatch.java | 12 +- .../manager/matching/v2/ProtocolMatch.java | 4 +- .../manager/matching/v2/SchemaMatch.java | 51 ++--- .../manager/matching/v2/StreamMatch.java | 90 ++++----- .../matching/v2/StreamQualityMatch.java | 30 +-- .../v2/pipeline/ApplyGroundingStep.java | 16 +- .../v2/pipeline/CheckCompletedVisitor.java | 30 ++- .../v2/pipeline/ComputeOutputStep.java | 14 +- .../v2/pipeline/PipelineValidationSteps.java | 14 +- .../v2/pipeline/SpValidationException.java | 3 +- .../pipeline/UpdateOutputStrategiesStep.java | 28 +-- .../UpdateStaticPropertiesVisitor.java | 27 ++- .../matching/v2/utils/MatchingUtils.java | 30 +-- .../pipeline/ExtensionsLogProvider.java | 12 +- .../ExtensionsServiceLogExecutor.java | 8 +- .../manager/operations/Operations.java | 20 +- .../manager/permission/PermissionManager.java | 4 +- .../pipeline/PipelineCacheManager.java | 22 +-- .../PipelineCanvasMetadataCacheManager.java | 22 +-- .../manager/pipeline/PipelineManager.java | 10 +- .../preview/ActivePipelinePreviews.java | 8 +- .../manager/preview/PipelinePreview.java | 28 +-- .../recommender/AllElementsProvider.java | 18 +- .../recommender/ElementRecommender.java | 83 +++++---- .../ContainerProvidedOptionsHandler.java | 13 +- .../PipelineElementRuntimeInfoFetcher.java | 30 +-- .../manager/selector/PropertyFinder.java | 2 +- .../selector/PropertyRequirementSelector.java | 4 +- .../manager/selector/PropertySelector.java | 49 ++--- .../selector/PropertySelectorGenerator.java | 12 +- .../manager/setup/AutoInstallation.java | 21 ++- .../setup/CouchDbInstallationStep.java | 69 +++---- .../setup/InstallationConfiguration.java | 76 ++++---- .../manager/setup/InstallationStep.java | 36 ++-- .../PipelineElementInstallationStep.java | 6 +- .../UserRegistrationInstallationStep.java | 98 +++++----- .../setup/design/DesignDocumentUtils.java | 1 + .../setup/design/UserDesignDocument.java | 16 +- .../setup/tasks/CreateAssetLinkTypeTask.java | 19 +- .../manager/storage/UserService.java | 8 +- .../template/AdapterTemplateHandler.java | 14 +- .../PipelineElementTemplateHandler.java | 3 +- .../PipelineElementTemplateVisitor.java | 26 ++- .../manager/template/PipelineGenerator.java | 16 +- .../template/PipelineTemplateGenerator.java | 22 ++- .../PipelineTemplateInvocationGenerator.java | 16 +- .../PipelineTemplateInvocationHandler.java | 27 +-- .../instances/DataLakePipelineTemplate.java | 19 +- .../template/instances/PipelineTemplate.java | 2 +- .../manager/topic/WildcardTopicGenerator.java | 12 +- .../manager/util/AuthTokenUtils.java | 8 +- .../util/PipelineVerificationUtils.java | 6 +- .../manager/util/TopicGenerator.java | 3 +- .../streampipes/manager/util/TreeUtils.java | 5 +- .../verification/DataProcessorVerifier.java | 4 +- .../verification/DataSinkVerifier.java | 75 ++++---- .../verification/DataStreamVerifier.java | 2 +- .../manager/verification/ElementVerifier.java | 25 ++- .../manager/verification/StorageState.java | 2 +- .../verification/extractor/TypeExtractor.java | 95 +++++----- .../messages/VerificationError.java | 11 +- .../messages/VerificationResult.java | 21 +-- .../messages/VerificationWarning.java | 6 +- .../runtime/HeartbeatMessageGenerator.java | 23 ++- .../runtime/SourceSchemaVerifier.java | 26 +-- .../structure/AbstractVerifier.java | 35 ++-- .../structure/GeneralVerifier.java | 31 ++-- .../structure/StreamVerifier.java | 29 ++- .../verification/structure/Verifier.java | 6 +- .../streampipes/manager/ThrowableCaptor.java | 22 +-- .../manager/assets/TestImagePathReplacer.java | 34 ++-- .../manager/file/TestFileManager.java | 58 +++--- .../TestPipelineValidationHandler.java | 58 +++--- .../manager/matching/TestRdfId.java | 6 +- .../matching/v2/ListPropertyMatchTest.java | 45 ++--- .../matching/v2/TestDatatypeMatch.java | 85 +++++---- .../matching/v2/TestDomainPropertyMatch.java | 63 ++++--- .../matching/v2/TestElementVerification.java | 24 +-- .../manager/matching/v2/TestFormatMatch.java | 55 +++--- .../matching/v2/TestGroundingMatch.java | 174 +++++++++--------- .../matching/v2/TestMeasurementUnitMatch.java | 3 +- .../v2/TestPrimitivePropertyMatch.java | 75 ++++---- .../matching/v2/TestProtocolMatch.java | 57 +++--- .../manager/matching/v2/TestSchemaMatch.java | 162 ++++++++-------- .../manager/matching/v2/TestUtils.java | 123 +++++++------ .../manager/pipeline/TestPipelineManager.java | 3 +- .../TestPipelinesContainingElements.java | 1 + .../selector/TestPropertyRenaming.java | 11 +- .../manager/selector/TestSelector.java | 15 +- .../selector/TestSelectorGenerator.java | 12 +- .../manager/selector/TestSelectorUtils.java | 56 +++--- 147 files changed, 2203 insertions(+), 1969 deletions(-) diff --git a/streampipes-pipeline-management/pom.xml b/streampipes-pipeline-management/pom.xml index 8f2ccb4c4c..831a8e2d55 100644 --- a/streampipes-pipeline-management/pom.xml +++ b/streampipes-pipeline-management/pom.xml @@ -165,4 +165,13 @@ test + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetExtractor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetExtractor.java index cbed80d330..9bbdc74e0e 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetExtractor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetExtractor.java @@ -45,11 +45,11 @@ private void replaceImagePaths() { private String makeAssetLocation(String appId) { return AssetConstants.ASSET_BASE_DIR - + File.separator + appId; + + File.separator + appId; } private String makeDocumentationAssetPath(String appId) { return makeAssetLocation(appId) + File.separator + GlobalStreamPipesConstants - .STD_DOCUMENTATION_NAME; + .STD_DOCUMENTATION_NAME; } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java index b8b39d0f96..553762018e 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java @@ -17,10 +17,11 @@ */ package org.apache.streampipes.manager.assets; -import org.apache.http.client.fluent.Request; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + +import org.apache.http.client.fluent.Request; import java.io.IOException; import java.io.InputStream; @@ -41,10 +42,10 @@ public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider, public InputStream fetchPipelineElementAssets() throws IOException, NoServiceEndpointsAvailableException { String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, spServiceUrlProvider).getEndpointResourceUrl(); return Request - .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX) - .execute() - .returnContent() - .asStream(); + .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX) + .execute() + .returnContent() + .asStream(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java index 8fd172b207..a570864ae7 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java @@ -17,11 +17,12 @@ */ package org.apache.streampipes.manager.assets; -import org.apache.commons.io.FileUtils; import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; +import org.apache.commons.io.FileUtils; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -46,7 +47,7 @@ public static byte[] getAsset(String appId, String assetName) throws IOException public static void storeAsset(SpServiceUrlProvider spServiceUrlProvider, String appId) throws IOException, NoServiceEndpointsAvailableException { InputStream assetStream = new AssetFetcher(spServiceUrlProvider, appId) - .fetchPipelineElementAssets(); + .fetchPipelineElementAssets(); new AssetExtractor(assetStream, appId).extractAssetContents(); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraph.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraph.java index d8739058c3..d097379f79 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraph.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraph.java @@ -19,11 +19,12 @@ package org.apache.streampipes.manager.data; import org.apache.streampipes.model.base.NamedStreamPipesEntity; + import org.jgrapht.graph.DirectedMultigraph; public class PipelineGraph extends DirectedMultigraph { - public PipelineGraph() { - super(String.class); - } + public PipelineGraph() { + super(String.class); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java index 5b5a934806..9e2d7351a1 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java @@ -28,52 +28,52 @@ public class PipelineGraphBuilder { - private final Pipeline pipeline; - private final List allPipelineElements; - private final List invocableElements; + private final Pipeline pipeline; + private final List allPipelineElements; + private final List invocableElements; - public PipelineGraphBuilder(Pipeline pipeline) { - this.pipeline = pipeline; - this.allPipelineElements = addAll(); - this.invocableElements = addInvocable(); - } - - private List addAll() { - List allElements = new ArrayList<>(); - allElements.addAll(pipeline.getStreams()); - allElements.addAll(addInvocable()); - return allElements; - } + public PipelineGraphBuilder(Pipeline pipeline) { + this.pipeline = pipeline; + this.allPipelineElements = addAll(); + this.invocableElements = addInvocable(); + } - private List addInvocable() { - List allElements = new ArrayList<>(); - allElements.addAll(pipeline.getSepas()); - allElements.addAll(pipeline.getActions()); - return allElements; - } + private List addAll() { + List allElements = new ArrayList<>(); + allElements.addAll(pipeline.getStreams()); + allElements.addAll(addInvocable()); + return allElements; + } + private List addInvocable() { + List allElements = new ArrayList<>(); + allElements.addAll(pipeline.getSepas()); + allElements.addAll(pipeline.getActions()); + return allElements; + } - public PipelineGraph buildGraph() { - PipelineGraph pipelineGraph = new PipelineGraph(); - allPipelineElements.forEach(pipelineGraph::addVertex); - for(NamedStreamPipesEntity source : allPipelineElements) { - List targets = findTargets(source.getDom()); - targets.forEach(t -> pipelineGraph.addEdge(source, t, createEdge(source, t))); - } + public PipelineGraph buildGraph() { + PipelineGraph pipelineGraph = new PipelineGraph(); + allPipelineElements.forEach(pipelineGraph::addVertex); - return pipelineGraph; + for (NamedStreamPipesEntity source : allPipelineElements) { + List targets = findTargets(source.getDom()); + targets.forEach(t -> pipelineGraph.addEdge(source, t, createEdge(source, t))); } - private List findTargets(String domId) { - return invocableElements - .stream() - .filter(i -> i.getConnectedTo().contains(domId)) - .collect(Collectors.toList()); - } + return pipelineGraph; + } - private String createEdge(NamedStreamPipesEntity sourceVertex, - NamedStreamPipesEntity targetVertex) { - return sourceVertex.getDom() + "-" + targetVertex.getDom(); - } + private List findTargets(String domId) { + return invocableElements + .stream() + .filter(i -> i.getConnectedTo().contains(domId)) + .collect(Collectors.toList()); + } + + private String createEdge(NamedStreamPipesEntity sourceVertex, + NamedStreamPipesEntity targetVertex) { + return sourceVertex.getDom() + "-" + targetVertex.getDom(); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java index c92b8723f7..0291c92030 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java @@ -26,20 +26,20 @@ public class PipelineGraphHelpers { - public static List findStreams(PipelineGraph pipelineGraph) { - return find(pipelineGraph, SpDataStream.class); - } + public static List findStreams(PipelineGraph pipelineGraph) { + return find(pipelineGraph, SpDataStream.class); + } - public static List findInvocableElements(PipelineGraph pipelineGraph) { - return find(pipelineGraph, InvocableStreamPipesEntity.class); - } + public static List findInvocableElements(PipelineGraph pipelineGraph) { + return find(pipelineGraph, InvocableStreamPipesEntity.class); + } - private static List find(PipelineGraph pipelineGraph, Class clazz) { - return pipelineGraph - .vertexSet() - .stream() - .filter(clazz::isInstance) - .map(clazz::cast) - .collect(Collectors.toList()); - } + private static List find(PipelineGraph pipelineGraph, Class clazz) { + return pipelineGraph + .vertexSet() + .stream() + .filter(clazz::isInstance) + .map(clazz::cast) + .collect(Collectors.toList()); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java index a6745d2b46..3cc0ae4c86 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java @@ -18,48 +18,52 @@ package org.apache.streampipes.manager.endpoint; -import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.http.client.fluent.Request; -import org.apache.http.message.BasicHeader; import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint; import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem; import org.apache.streampipes.serializers.json.JacksonSerializer; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.http.client.fluent.Request; +import org.apache.http.message.BasicHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.MediaType; + import java.io.IOException; import java.util.ArrayList; import java.util.List; public class EndpointItemFetcher { - Logger logger = LoggerFactory.getLogger(EndpointItemFetcher.class); + Logger logger = LoggerFactory.getLogger(EndpointItemFetcher.class); - private List extensionsServiceEndpoints; + private List extensionsServiceEndpoints; - public EndpointItemFetcher(List extensionsServiceEndpoints) { - this.extensionsServiceEndpoints = extensionsServiceEndpoints; - } + public EndpointItemFetcher(List extensionsServiceEndpoints) { + this.extensionsServiceEndpoints = extensionsServiceEndpoints; + } - public List getItems() { - List endpointItems = new ArrayList<>(); - extensionsServiceEndpoints.forEach(e -> endpointItems.addAll(getEndpointItems(e))); - return endpointItems; - } + public List getItems() { + List endpointItems = new ArrayList<>(); + extensionsServiceEndpoints.forEach(e -> endpointItems.addAll(getEndpointItems(e))); + return endpointItems; + } - private List getEndpointItems(ExtensionsServiceEndpoint e) { - try { - String result = Request.Get(e.getEndpointUrl()) - .addHeader(new BasicHeader("Accept", MediaType.APPLICATION_JSON)) - .connectTimeout(1000) - .execute() - .returnContent() - .asString(); + private List getEndpointItems(ExtensionsServiceEndpoint e) { + try { + String result = Request.Get(e.getEndpointUrl()) + .addHeader(new BasicHeader("Accept", MediaType.APPLICATION_JSON)) + .connectTimeout(1000) + .execute() + .returnContent() + .asString(); - return JacksonSerializer.getObjectMapper().readValue(result, new TypeReference>() {}); - } catch (IOException e1) { - logger.warn("Processing Element Descriptions could not be fetched from RDF endpoint: " + e.getEndpointUrl()); - return new ArrayList<>(); - } + return JacksonSerializer.getObjectMapper() + .readValue(result, new TypeReference>() { + }); + } catch (IOException e1) { + logger.warn("Processing Element Descriptions could not be fetched from RDF endpoint: " + e.getEndpointUrl()); + return new ArrayList<>(); } + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemParser.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemParser.java index f2a42806f5..d019561c8c 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemParser.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemParser.java @@ -17,13 +17,15 @@ */ package org.apache.streampipes.manager.endpoint; -import org.apache.http.client.fluent.Request; import org.apache.streampipes.manager.operations.Operations; import org.apache.streampipes.model.message.Message; import org.apache.streampipes.model.message.NotificationType; import org.apache.streampipes.model.message.Notifications; +import org.apache.http.client.fluent.Request; + import javax.ws.rs.core.MediaType; + import java.io.IOException; import java.net.URLDecoder; @@ -45,10 +47,10 @@ public Message parseAndAddEndpointItem(String url, private String parseURIContent(String url) throws IOException { return Request - .Get(url) - .addHeader("Accept", MediaType.APPLICATION_JSON) - .execute() - .returnContent() - .asString(); + .Get(url) + .addHeader("Accept", MediaType.APPLICATION_JSON) + .execute() + .returnContent() + .asString(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java index 8d430e46ce..59d1868435 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java @@ -22,6 +22,7 @@ import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +57,7 @@ public String getEndpointBaseUrl() throws NoServiceEndpointsAvailableException { private List getServiceEndpoints() { return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(DefaultSpServiceGroups.EXT, true, - Collections.singletonList(this.spServiceUrlProvider.getServiceTag(appId).asString())); + Collections.singletonList(this.spServiceUrlProvider.getServiceTag(appId).asString())); } private String selectService() throws NoServiceEndpointsAvailableException { @@ -64,8 +65,10 @@ private String selectService() throws NoServiceEndpointsAvailableException { if (serviceEndpoints.size() > 0) { return getServiceEndpoints().get(0); } else { - LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, this.spServiceUrlProvider.getServiceTag(appId).asString()); - throw new NoServiceEndpointsAvailableException("Could not find any matching service endpoints - are all software components running?"); + LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, + this.spServiceUrlProvider.getServiceTag(appId).asString()); + throw new NoServiceEndpointsAvailableException( + "Could not find any matching service endpoints - are all software components running?"); } } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java index e3db01c559..59598fa146 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java @@ -23,6 +23,7 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.pipeline.PipelineElementStatus; import org.apache.streampipes.model.pipeline.PipelineOperationStatus; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ public class GraphSubmitter { private String pipelineId; private String pipelineName; - private final static Logger LOG = LoggerFactory.getLogger(GraphSubmitter.class); + private static final Logger LOG = LoggerFactory.getLogger(GraphSubmitter.class); public GraphSubmitter(String pipelineId, String pipelineName, @@ -59,8 +60,8 @@ public PipelineOperationStatus invokeGraphs() { graphs.forEach(g -> status.addPipelineElementStatus(performInvocation(g))); if (status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess)) { dataSets.forEach(dataSet -> - status.addPipelineElementStatus - (performInvocation(dataSet))); + status.addPipelineElementStatus + (performInvocation(dataSet))); } status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess)); @@ -124,7 +125,8 @@ private PipelineElementStatus performDetach(InvocableStreamPipesEntity entity) { } private PipelineElementStatus performDetach(SpDataSet dataset) { - String endpointUrl = dataset.getSelectedEndpointUrl() + "/" + dataset.getCorrespondingAdapterId() + "/" + dataset.getDatasetInvocationId(); + String endpointUrl = dataset.getSelectedEndpointUrl() + "/" + dataset.getCorrespondingAdapterId() + "/" + + dataset.getDatasetInvocationId(); return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).detach(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java index 4dd5184e9c..d9dd5a3130 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java @@ -18,14 +18,15 @@ package org.apache.streampipes.manager.execution.http; -import com.google.gson.JsonSyntaxException; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; -import org.apache.http.entity.ContentType; import org.apache.streampipes.manager.util.AuthTokenUtils; import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.pipeline.PipelineElementStatus; import org.apache.streampipes.serializers.json.JacksonSerializer; + +import com.google.gson.JsonSyntaxException; +import org.apache.http.client.fluent.Request; +import org.apache.http.client.fluent.Response; +import org.apache.http.entity.ContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +38,7 @@ public class HttpRequestBuilder { private final String endpointUrl; private String pipelineId; - private final static Logger LOG = LoggerFactory.getLogger(HttpRequestBuilder.class); + private static final Logger LOG = LoggerFactory.getLogger(HttpRequestBuilder.class); public HttpRequestBuilder(NamedStreamPipesEntity payload, String endpointUrl, @@ -52,11 +53,11 @@ public PipelineElementStatus invoke() { try { String jsonDocument = toJson(); Response httpResp = - Request.Post(endpointUrl) - .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId)) - .bodyString(jsonDocument, ContentType.APPLICATION_JSON) - .connectTimeout(10000) - .execute(); + Request.Post(endpointUrl) + .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId)) + .bodyString(jsonDocument, ContentType.APPLICATION_JSON) + .connectTimeout(10000) + .execute(); return handleResponse(httpResp); } catch (Exception e) { LOG.error("Could not perform invocation request", e); @@ -67,8 +68,8 @@ public PipelineElementStatus invoke() { public PipelineElementStatus detach() { try { Response httpResp = Request.Delete(endpointUrl) - .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId)) - .connectTimeout(10000).execute(); + .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId)) + .connectTimeout(10000).execute(); return handleResponse(httpResp); } catch (Exception e) { LOG.error("Could not stop pipeline {}", endpointUrl, e); @@ -79,8 +80,8 @@ public PipelineElementStatus detach() { private PipelineElementStatus handleResponse(Response httpResp) throws JsonSyntaxException, IOException { String resp = httpResp.returnContent().asString(); org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer - .getObjectMapper() - .readValue(resp, org.apache.streampipes.model.Response.class); + .getObjectMapper() + .readValue(resp, org.apache.streampipes.model.Response.class); return convert(streamPipesResp); } @@ -89,7 +90,8 @@ private String toJson() throws Exception { } private PipelineElementStatus convert(org.apache.streampipes.model.Response response) { - return new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(), response.getOptionalMessage()); + return new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(), + response.getOptionalMessage()); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java index d484bcea65..9ee3b8467c 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java @@ -25,7 +25,6 @@ import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; import org.apache.streampipes.manager.execution.status.PipelineStatusManager; -import org.apache.streampipes.resource.management.secret.SecretProvider; import org.apache.streampipes.manager.util.TemporaryGraphStorage; import org.apache.streampipes.model.SpDataSet; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; @@ -39,12 +38,14 @@ import org.apache.streampipes.model.pipeline.PipelineElementStatus; import org.apache.streampipes.model.pipeline.PipelineHealthStatus; import org.apache.streampipes.model.pipeline.PipelineOperationStatus; +import org.apache.streampipes.resource.management.secret.SecretProvider; import org.apache.streampipes.storage.api.IPipelineStorage; import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups; import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + import org.lightcouch.DocumentConflictException; import java.util.ArrayList; @@ -76,11 +77,11 @@ public PipelineOperationStatus startPipeline() { List secs = pipeline.getActions(); List dataSets = pipeline - .getStreams() - .stream() - .filter(s -> s instanceof SpDataSet) - .map(s -> new SpDataSet((SpDataSet) s)) - .collect(Collectors.toList()); + .getStreams() + .stream() + .filter(s -> s instanceof SpDataSet) + .map(s -> new SpDataSet((SpDataSet) s)) + .collect(Collectors.toList()); List failedServices = new ArrayList<>(); @@ -99,21 +100,21 @@ public PipelineOperationStatus startPipeline() { decryptSecrets(graphs); - graphs.forEach(g -> { - try { - g.setSelectedEndpointUrl(findSelectedEndpoint(g)); - g.setCorrespondingPipeline(pipeline.getPipelineId()); - } catch (NoServiceEndpointsAvailableException e) { - failedServices.add(g); - } - }); + graphs.forEach(g -> { + try { + g.setSelectedEndpointUrl(findSelectedEndpoint(g)); + g.setCorrespondingPipeline(pipeline.getPipelineId()); + } catch (NoServiceEndpointsAvailableException e) { + failedServices.add(g); + } + }); PipelineOperationStatus status; if (failedServices.size() == 0) { status = new GraphSubmitter(pipeline.getPipelineId(), - pipeline.getName(), graphs, dataSets) - .invokeGraphs(); + pipeline.getName(), graphs, dataSets) + .invokeGraphs(); encryptSecrets(graphs); @@ -121,7 +122,9 @@ public PipelineOperationStatus startPipeline() { storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets); PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(), - new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STARTED.title(), PipelineStatusMessageType.PIPELINE_STARTED.description())); + new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), + PipelineStatusMessageType.PIPELINE_STARTED.title(), + PipelineStatusMessageType.PIPELINE_STARTED.description())); if (storeStatus) { pipeline.setHealthStatus(PipelineHealthStatus.OK); @@ -130,23 +133,23 @@ public PipelineOperationStatus startPipeline() { } } else { List pe = failedServices.stream().map(fs -> - new PipelineElementStatus(fs.getElementId(), - fs.getName(), - false, - "No active supporting service found")).collect(Collectors.toList()); + new PipelineElementStatus(fs.getElementId(), + fs.getName(), + false, + "No active supporting service found")).collect(Collectors.toList()); status = new PipelineOperationStatus(pipeline.getPipelineId(), - pipeline.getName(), - "Could not start pipeline " + pipeline.getName() + ".", - pe); + pipeline.getName(), + "Could not start pipeline " + pipeline.getName() + ".", + pe); } return status; } private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { return new ExtensionsServiceEndpointGenerator( - g.getAppId(), - ExtensionsServiceEndpointUtils.getPipelineElementType(g)) - .getEndpointResourceUrl(); + g.getAppId(), + ExtensionsServiceEndpointUtils.getPipelineElementType(g)) + .getEndpointResourceUrl(); } private String findSelectedEndpoint(SpDataSet ds) throws NoServiceEndpointsAvailableException { @@ -155,12 +158,14 @@ private String findSelectedEndpoint(SpDataSet ds) throws NoServiceEndpointsAvail return getConnectMasterSourcesUrl(); } else { return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.DATA_SET) - .getEndpointResourceUrl(); + .getEndpointResourceUrl(); } } private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableException { - List connectMasterEndpoints = SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(DefaultSpServiceGroups.CORE, true, Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString())); + List connectMasterEndpoints = SpServiceDiscovery.getServiceDiscovery() + .getServiceEndpoints(DefaultSpServiceGroups.CORE, true, + Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString())); if (connectMasterEndpoints.size() > 0) { return connectMasterEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT; } else { @@ -170,11 +175,11 @@ private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableEx private void updateGroupIds(InvocableStreamPipesEntity entity) { entity.getInputStreams() - .stream() - .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) - .map(is -> is.getEventGrounding().getTransportProtocol()) - .map(KafkaTransportProtocol.class::cast) - .forEach(tp -> tp.setGroupId(Utils.filterSpecialChar(pipeline.getName()) + MD5.crypt(tp.getElementId()))); + .stream() + .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) + .map(is -> is.getEventGrounding().getTransportProtocol()) + .map(KafkaTransportProtocol.class::cast) + .forEach(tp -> tp.setGroupId(Utils.filterSpecialChar(pipeline.getName()) + MD5.crypt(tp.getElementId()))); } private void decryptSecrets(List graphs) { @@ -190,15 +195,15 @@ public PipelineOperationStatus stopPipeline() { List dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId()); PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), - pipeline.getName(), graphs, dataSets) - .detachGraphs(); + pipeline.getName(), graphs, dataSets) + .detachGraphs(); if (status.isSuccess()) { PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(), - new PipelineStatusMessage(pipeline.getPipelineId(), - System.currentTimeMillis(), - PipelineStatusMessageType.PIPELINE_STOPPED.title(), - PipelineStatusMessageType.PIPELINE_STOPPED.description())); + new PipelineStatusMessage(pipeline.getPipelineId(), + System.currentTimeMillis(), + PipelineStatusMessageType.PIPELINE_STOPPED.title(), + PipelineStatusMessageType.PIPELINE_STOPPED.description())); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java index 7603776968..5c42b5eb06 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java @@ -52,10 +52,10 @@ public void addPipeline() { private void preparePipeline() { PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph(); List graphs = pipelineGraph - .vertexSet() - .stream() - .filter(v -> v instanceof InvocableStreamPipesEntity).map(v -> (InvocableStreamPipesEntity) v) - .collect(Collectors.toList()); + .vertexSet() + .stream() + .filter(v -> v instanceof InvocableStreamPipesEntity).map(v -> (InvocableStreamPipesEntity) v) + .collect(Collectors.toList()); encryptSecrets(graphs); List secs = filter(graphs, DataSinkInvocation.class); @@ -77,9 +77,9 @@ private void encryptSecrets(Pipeline pipeline) { private List filter(List graphs, Class clazz) { return graphs - .stream() - .filter(clazz::isInstance) - .map(clazz::cast) - .collect(Collectors.toList()); + .stream() + .filter(clazz::isInstance) + .map(clazz::cast) + .collect(Collectors.toList()); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/status/PipelineStatusManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/status/PipelineStatusManager.java index 69f065ab75..6e6e2a7d52 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/status/PipelineStatusManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/status/PipelineStatusManager.java @@ -18,41 +18,47 @@ package org.apache.streampipes.manager.execution.status; +import org.apache.streampipes.model.message.PipelineStatusMessage; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.streampipes.model.message.PipelineStatusMessage; - public class PipelineStatusManager { - private static Map> pipelineStatusMessages = new HashMap<>(); - - public static void addPipelineStatus(String pipelineId, PipelineStatusMessage message) { - if (isInitialized(pipelineId)) - pipelineStatusMessages.get(pipelineId).add(message); - else { - List statusMessageList = new ArrayList<>(); - statusMessageList.add(message); - pipelineStatusMessages.put(pipelineId, statusMessageList); - } - - } - - private static boolean isInitialized(String pipelineId) { - return pipelineStatusMessages.containsKey(pipelineId); - } - - public static List getPipelineStatus(String pipelineId) { - if (!isInitialized(pipelineId)) return new ArrayList<>(); - else return pipelineStatusMessages.get(pipelineId); - } - - public static List getPipelineStatus(String pipelineId, int numberOfLatestEntries) { - List messages = getPipelineStatus(pipelineId); - int statusMessageCount = messages.size(); - if (statusMessageCount <= numberOfLatestEntries) return messages; - else return messages.subList(messages.size()-numberOfLatestEntries, messages.size()); - } + private static final Map> pipelineStatusMessages = new HashMap<>(); + + public static void addPipelineStatus(String pipelineId, PipelineStatusMessage message) { + if (isInitialized(pipelineId)) { + pipelineStatusMessages.get(pipelineId).add(message); + } else { + List statusMessageList = new ArrayList<>(); + statusMessageList.add(message); + pipelineStatusMessages.put(pipelineId, statusMessageList); + } + + } + + private static boolean isInitialized(String pipelineId) { + return pipelineStatusMessages.containsKey(pipelineId); + } + + public static List getPipelineStatus(String pipelineId) { + if (!isInitialized(pipelineId)) { + return new ArrayList<>(); + } else { + return pipelineStatusMessages.get(pipelineId); + } + } + + public static List getPipelineStatus(String pipelineId, int numberOfLatestEntries) { + List messages = getPipelineStatus(pipelineId); + int statusMessageCount = messages.size(); + if (statusMessageCount <= numberOfLatestEntries) { + return messages; + } else { + return messages.subList(messages.size() - numberOfLatestEntries, messages.size()); + } + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileConstants.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileConstants.java index 6e65508f47..7ed4913152 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileConstants.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileConstants.java @@ -20,7 +20,6 @@ import org.apache.streampipes.config.backend.BackendConfig; public class FileConstants { - public static final String FILES_BASE_DIR = BackendConfig.INSTANCE.getFilesDir(); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileHandler.java index af2f18b167..b5403a6f94 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileHandler.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileHandler.java @@ -49,6 +49,6 @@ private File makeFile(String filename) { private String makeFileLocation() { return FileConstants.FILES_BASE_DIR - + File.separator; + + File.separator; } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileManager.java index 01851cf381..6ba0f85d11 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileManager.java @@ -17,12 +17,13 @@ */ package org.apache.streampipes.manager.file; -import org.apache.commons.io.input.BOMInputStream; import org.apache.streampipes.model.file.FileMetadata; import org.apache.streampipes.sdk.helpers.Filetypes; import org.apache.streampipes.storage.api.IFileMetadataStorage; import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.commons.io.input.BOMInputStream; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -34,7 +35,7 @@ public class FileManager { public static List getAllFiles() { - return getAllFiles(null); + return getAllFiles(null); } public static List getAllFiles(String filetypes) { @@ -45,15 +46,15 @@ public static List getAllFiles(String filetypes) { /** * Store a file in the internal file storage. * For csv files the bom is removed - * @param user who created the file + * + * @param user who created the file * @param filename * @param fileInputStream content of file * @return - * @throws IOException */ public static FileMetadata storeFile(String user, - String filename, - InputStream fileInputStream) throws IOException { + String filename, + InputStream fileInputStream) throws IOException { String filetype = filename.substring(filename.lastIndexOf(".") + 1); @@ -78,6 +79,7 @@ public static File getFile(String filename) { /** * Remove Byte Order Mark (BOM) from csv files + * * @param fileInputStream * @param filetype * @return @@ -96,9 +98,9 @@ private static void storeFileMetadata(FileMetadata fileMetadata) { private static IFileMetadataStorage getFileMetadataStorage() { return StorageDispatcher - .INSTANCE - .getNoSqlStore() - .getFileMetadataStorage(); + .INSTANCE + .getNoSqlStore() + .getFileMetadataStorage(); } private static FileMetadata makeFileMetadata(String user, @@ -122,10 +124,10 @@ private static String makeInternalFilename(String filetype) { private static List filterFiletypes(List allFiles, String filetypes) { return allFiles - .stream() - .filter(fileMetadata -> Arrays - .stream(filetypes.split(",")) - .anyMatch(ft -> ft.equals(fileMetadata.getFiletype()))) - .collect(Collectors.toList()); + .stream() + .filter(fileMetadata -> Arrays + .stream(filetypes.split(",")) + .anyMatch(ft -> ft.equals(fileMetadata.getFiletype()))) + .collect(Collectors.toList()); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java index 12b6bb6e57..8d9dc937e0 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java @@ -17,9 +17,10 @@ */ package org.apache.streampipes.manager.health; +import org.apache.streampipes.serializers.json.JacksonSerializer; + import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.http.client.fluent.Request; -import org.apache.streampipes.serializers.json.JacksonSerializer; import java.io.IOException; import java.util.Arrays; diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java index ecdd303866..335d76c46a 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java @@ -29,13 +29,18 @@ import org.apache.streampipes.model.pipeline.PipelineHealthStatus; import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -81,16 +86,17 @@ public void checkAndRestorePipelineElements() { addFailedAttemptNotification(pipelineNotifications, graph); increaseFailedAttempt(instanceId); LOG.info("Could not restore pipeline element {} of pipeline {} ({}/{})", - graph.getName(), - pipeline.getName(), - failedRestartAttempts.get(instanceId), - MAX_FAILED_ATTEMPTS); + graph.getName(), + pipeline.getName(), + failedRestartAttempts.get(instanceId), + MAX_FAILED_ATTEMPTS); } else { recoveredInstances.add(instanceId); addSuccessfulRestoreNotification(pipelineNotifications, graph); resetFailedAttempts(instanceId); graph.setSelectedEndpointUrl(endpointUrl); - LOG.info("Successfully restored pipeline element {} of pipeline {}", graph.getName(), pipeline.getName()); + LOG.info("Successfully restored pipeline element {} of pipeline {}", graph.getName(), + pipeline.getName()); } } } @@ -137,23 +143,23 @@ private void increaseFailedAttempt(String instanceId) { private void addSuccessfulRestoreNotification(List pipelineNotifications, InvocableStreamPipesEntity graph) { pipelineNotifications.add(getCurrentDatetime() - + "Pipeline element '" - + graph.getName() - + "' was not available and was successfully restored."); + + "Pipeline element '" + + graph.getName() + + "' was not available and was successfully restored."); } private void addFailedAttemptNotification(List pipelineNotifications, InvocableStreamPipesEntity graph) { pipelineNotifications.add(getCurrentDatetime() - + "Pipeline element '" - + graph.getName() - + "' was not available and could not be restored."); + + "Pipeline element '" + + graph.getName() + + "' was not available and could not be restored."); } private String getCurrentDatetime() { DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss"); LocalDateTime now = LocalDateTime.now(); - return "[" +dtf.format(now) + "] "; + return "[" + dtf.format(now) + "] "; } private String extractInstanceId(InvocableStreamPipesEntity graph) { @@ -177,7 +183,7 @@ private List findRunningInstances(Set endpoints) { private Map> generateEndpointMap() { Map> endpointMap = new HashMap<>(); TemporaryGraphStorage.graphStorage.forEach((pipelineId, graphs) -> - graphs.forEach(graph -> addEndpoint(endpointMap, graph))); + graphs.forEach(graph -> addEndpoint(endpointMap, graph))); return endpointMap; } @@ -200,12 +206,12 @@ public void run() { private List getRunningPipelines() { return StorageDispatcher - .INSTANCE - .getNoSqlStore() - .getPipelineStorageAPI() - .getAllPipelines() - .stream() - .filter(Pipeline::isRunning) - .collect(Collectors.toList()); + .INSTANCE + .getNoSqlStore() + .getPipelineStorageAPI() + .getAllPipelines() + .stream() + .filter(Pipeline::isRunning) + .collect(Collectors.toList()); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/info/SystemInfoProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/info/SystemInfoProvider.java index b0da8d111a..715191e906 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/info/SystemInfoProvider.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/info/SystemInfoProvider.java @@ -21,35 +21,35 @@ public class SystemInfoProvider { - public SystemInfo getSystemInfo() { - SystemInfo systemInfo = new SystemInfo(); - - systemInfo.setJavaVmName(getProperty("java.vm.name")); - systemInfo.setJavaVmVendor(getProperty("java.vm.vendor")); - systemInfo.setJavaVmVersion(getProperty("java.vm.version")); - systemInfo.setJavaRuntimeName(getProperty("java.runtime.name")); - systemInfo.setJavaRuntimeVersion(getProperty("java.runtime.version")); - systemInfo.setOsName(getProperty("os.name")); - systemInfo.setOsVersion(getProperty("os.version")); - systemInfo.setCpu(getProperty("sun.cpu.isalist")); - - Runtime runtime = Runtime.getRuntime(); - systemInfo.setTotalMemory(runtime.totalMemory()); - systemInfo.setFreeMemory(runtime.freeMemory()); - systemInfo.setTotalMemoryKB(runtime.totalMemory() / 1024); - systemInfo.setFreeMemoryKB(runtime.freeMemory() / 1024); - - return systemInfo; - } + public SystemInfo getSystemInfo() { + SystemInfo systemInfo = new SystemInfo(); + + systemInfo.setJavaVmName(getProperty("java.vm.name")); + systemInfo.setJavaVmVendor(getProperty("java.vm.vendor")); + systemInfo.setJavaVmVersion(getProperty("java.vm.version")); + systemInfo.setJavaRuntimeName(getProperty("java.runtime.name")); + systemInfo.setJavaRuntimeVersion(getProperty("java.runtime.version")); + systemInfo.setOsName(getProperty("os.name")); + systemInfo.setOsVersion(getProperty("os.version")); + systemInfo.setCpu(getProperty("sun.cpu.isalist")); + + Runtime runtime = Runtime.getRuntime(); + systemInfo.setTotalMemory(runtime.totalMemory()); + systemInfo.setFreeMemory(runtime.freeMemory()); + systemInfo.setTotalMemoryKB(runtime.totalMemory() / 1024); + systemInfo.setFreeMemoryKB(runtime.freeMemory() / 1024); + + return systemInfo; + } - private String getProperty(String key) { - String propValue = null; - try { - propValue = System.getProperty(key, ""); - } catch (Exception ex) { - propValue = "unknown"; - } - return propValue; + private String getProperty(String key) { + String propValue = null; + try { + propValue = System.getProperty(key, ""); + } catch (Exception ex) { + propValue = "unknown"; } - + return propValue; } + +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/info/VersionInfoProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/info/VersionInfoProvider.java index 8d37c63a3e..4a731a973c 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/info/VersionInfoProvider.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/info/VersionInfoProvider.java @@ -22,11 +22,11 @@ public class VersionInfoProvider { public VersionInfo makeVersionInfo() { - VersionInfo versionInfo = new VersionInfo(); - versionInfo.setBackendVersion(getClass().getPackage().getImplementationVersion()); + VersionInfo versionInfo = new VersionInfo(); + versionInfo.setBackendVersion(getClass().getPackage().getImplementationVersion()); - // TODO add versions of other services + // TODO add versions of other services - return versionInfo; + return versionInfo; } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionStorageHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionStorageHandler.java index afa21cb27a..3eb64c6d5c 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionStorageHandler.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionStorageHandler.java @@ -45,9 +45,9 @@ public void storeConnections() { pipeline.getActions().forEach(sink -> findConnections(sink, connections)); connections.forEach(connection -> StorageDispatcher.INSTANCE - .getNoSqlStore() - .getConnectionStorageApi() - .addConnection(connection)); + .getNoSqlStore() + .getConnectionStorageApi() + .addConnection(connection)); } private void findConnections(NamedStreamPipesEntity target, diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/DataSetGroundingSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/DataSetGroundingSelector.java index e2e7640361..75e6f5383d 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/DataSetGroundingSelector.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/DataSetGroundingSelector.java @@ -17,14 +17,22 @@ */ package org.apache.streampipes.manager.matching; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.streampipes.config.backend.BackendConfig; import org.apache.streampipes.config.backend.SpProtocol; import org.apache.streampipes.manager.util.TopicGenerator; import org.apache.streampipes.model.SpDataSet; -import org.apache.streampipes.model.grounding.*; +import org.apache.streampipes.model.grounding.EventGrounding; +import org.apache.streampipes.model.grounding.JmsTransportProtocol; +import org.apache.streampipes.model.grounding.KafkaTransportProtocol; +import org.apache.streampipes.model.grounding.MqttTransportProtocol; +import org.apache.streampipes.model.grounding.NatsTransportProtocol; +import org.apache.streampipes.model.grounding.SimpleTopicDefinition; +import org.apache.streampipes.model.grounding.TopicDefinition; +import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.model.message.DataSetModificationMessage; +import org.apache.commons.lang3.RandomStringUtils; + import java.util.Collections; public class DataSetGroundingSelector { @@ -42,42 +50,42 @@ public DataSetModificationMessage selectGrounding() { TopicDefinition topicDefinition = new SimpleTopicDefinition(topic); SpProtocol prioritizedProtocol = - BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0); + BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0); if (isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) { outputGrounding.setTransportProtocol(makeTransportProtocol( - BackendConfig.INSTANCE.getJmsHost(), - BackendConfig.INSTANCE.getJmsPort(), - topicDefinition, - JmsTransportProtocol.class + BackendConfig.INSTANCE.getJmsHost(), + BackendConfig.INSTANCE.getJmsPort(), + topicDefinition, + JmsTransportProtocol.class )); - } else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)){ + } else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)) { outputGrounding.setTransportProtocol(makeTransportProtocol( - BackendConfig.INSTANCE.getKafkaHost(), - BackendConfig.INSTANCE.getKafkaPort(), - topicDefinition, - KafkaTransportProtocol.class + BackendConfig.INSTANCE.getKafkaHost(), + BackendConfig.INSTANCE.getKafkaPort(), + topicDefinition, + KafkaTransportProtocol.class )); } else if (isPrioritized(prioritizedProtocol, MqttTransportProtocol.class)) { outputGrounding.setTransportProtocol(makeTransportProtocol( - BackendConfig.INSTANCE.getMqttHost(), - BackendConfig.INSTANCE.getMqttPort(), - topicDefinition, - MqttTransportProtocol.class + BackendConfig.INSTANCE.getMqttHost(), + BackendConfig.INSTANCE.getMqttPort(), + topicDefinition, + MqttTransportProtocol.class )); } else if (isPrioritized(prioritizedProtocol, NatsTransportProtocol.class)) { - outputGrounding.setTransportProtocol(makeTransportProtocol( - BackendConfig.INSTANCE.getNatsHost(), - BackendConfig.INSTANCE.getNatsPort(), - topicDefinition, - NatsTransportProtocol.class - )); - } + outputGrounding.setTransportProtocol(makeTransportProtocol( + BackendConfig.INSTANCE.getNatsHost(), + BackendConfig.INSTANCE.getNatsPort(), + topicDefinition, + NatsTransportProtocol.class + )); + } outputGrounding.setTransportFormats(Collections - .singletonList(spDataSet.getSupportedGrounding().getTransportFormats().get(0))); + .singletonList(spDataSet.getSupportedGrounding().getTransportFormats().get(0))); - return new DataSetModificationMessage(outputGrounding,RandomStringUtils.randomAlphanumeric(10)); + return new DataSetModificationMessage(outputGrounding, RandomStringUtils.randomAlphanumeric(10)); } public static Boolean isPrioritized(SpProtocol prioritizedProtocol, @@ -85,8 +93,8 @@ public static Boolean isPrioritized(SpProtocol prioritizedProtocol, return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName()); } - private static T makeTransportProtocol(String hostname, int port, TopicDefinition topicDefinition, - Class protocolClass) { + private static T makeTransportProtocol(String hostname, int port, TopicDefinition topicDefinition, + Class protocolClass) { T tpOut = null; if (protocolClass.equals(KafkaTransportProtocol.class)) { KafkaTransportProtocol tp = new KafkaTransportProtocol(); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java index 16168052ea..b51d2d0a66 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java @@ -33,45 +33,45 @@ public class FormatSelector extends GroundingSelector { - public FormatSelector(NamedStreamPipesEntity source, Set targets) { - super(source, targets); - } + public FormatSelector(NamedStreamPipesEntity source, Set targets) { + super(source, targets); + } - public TransportFormat getTransportFormat() { + public TransportFormat getTransportFormat() { - if (source instanceof SpDataStream) { - return ((SpDataStream) source) - .getEventGrounding() - .getTransportFormats() - .get(0); - } else { - List prioritizedFormats = - BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedFormats(); + if (source instanceof SpDataStream) { + return ((SpDataStream) source) + .getEventGrounding() + .getTransportFormats() + .get(0); + } else { + List prioritizedFormats = + BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedFormats(); - List supportedFormats = prioritizedFormats - .stream() - .filter(pf -> supportsFormat(pf.getMessageFormat())).collect(Collectors.toList()); + List supportedFormats = prioritizedFormats + .stream() + .filter(pf -> supportsFormat(pf.getMessageFormat())).collect(Collectors.toList()); - if (supportedFormats.size() > 0) { - return new TransportFormat(supportedFormats.get(0).getMessageFormat()); - } else { - return new TransportFormat(MessageFormat.Json); - } - } + if (supportedFormats.size() > 0) { + return new TransportFormat(supportedFormats.get(0).getMessageFormat()); + } else { + return new TransportFormat(MessageFormat.Json); + } } + } - public boolean supportsFormat(String format) { - List elements = buildInvocables(); - return elements - .stream() - .allMatch(e -> e - .getSupportedGrounding() - .getTransportFormats() - .stream() - .anyMatch(s -> rdfTypesAsString(s.getRdfType()).contains(format))); - } + public boolean supportsFormat(String format) { + List elements = buildInvocables(); + return elements + .stream() + .allMatch(e -> e + .getSupportedGrounding() + .getTransportFormats() + .stream() + .anyMatch(s -> rdfTypesAsString(s.getRdfType()).contains(format))); + } - private List rdfTypesAsString(List uri) { - return uri.stream().map(URI::toString).collect(Collectors.toList()); - } + private List rdfTypesAsString(List uri) { + return uri.stream().map(URI::toString).collect(Collectors.toList()); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingBuilder.java index cd7ff56f0c..ee683afd93 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingBuilder.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingBuilder.java @@ -29,26 +29,26 @@ public class GroundingBuilder { - private NamedStreamPipesEntity source; - private Set targets; - - public GroundingBuilder(NamedStreamPipesEntity source, Set targets) { - this.source = source; - this.targets = targets; - } - - public EventGrounding getEventGrounding() { - EventGrounding grounding = new EventGrounding(); - grounding.setTransportFormats(Collections.singletonList(getFormat())); - grounding.setTransportProtocols(Collections.singletonList(getProtocol())); - return grounding; - } - - private TransportFormat getFormat() { - return new FormatSelector(source, targets).getTransportFormat(); - } - - private TransportProtocol getProtocol() { - return new ProtocolSelector(source, targets).getPreferredProtocol(); - } + private NamedStreamPipesEntity source; + private Set targets; + + public GroundingBuilder(NamedStreamPipesEntity source, Set targets) { + this.source = source; + this.targets = targets; + } + + public EventGrounding getEventGrounding() { + EventGrounding grounding = new EventGrounding(); + grounding.setTransportFormats(Collections.singletonList(getFormat())); + grounding.setTransportProtocols(Collections.singletonList(getProtocol())); + return grounding; + } + + private TransportFormat getFormat() { + return new FormatSelector(source, targets).getTransportFormat(); + } + + private TransportProtocol getProtocol() { + return new ProtocolSelector(source, targets).getPreferredProtocol(); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java index 8895699b82..92c73cd797 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java @@ -27,20 +27,20 @@ public abstract class GroundingSelector { - protected NamedStreamPipesEntity source; - protected Set targets; - - public GroundingSelector(NamedStreamPipesEntity source, - Set targets) { - this.source = source; - this.targets = targets; - } - - protected List buildInvocables() { - List elements = new ArrayList<>(); - elements.add((InvocableStreamPipesEntity) source); - elements.addAll(targets); - - return elements; - } + protected NamedStreamPipesEntity source; + protected Set targets; + + public GroundingSelector(NamedStreamPipesEntity source, + Set targets) { + this.source = source; + this.targets = targets; + } + + protected List buildInvocables() { + List elements = new ArrayList<>(); + elements.add((InvocableStreamPipesEntity) source); + elements.addAll(targets); + + return elements; + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java index f8564c9f24..3969c159b8 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java @@ -33,7 +33,11 @@ import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo; import org.apache.streampipes.model.pipeline.PipelineModification; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; public class PipelineModificationGenerator { @@ -90,7 +94,8 @@ private void addModification(NamedStreamPipesEntity source, edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.complete(source.getDom(), t.getDom())); } catch (SpValidationException e) { //e.getErrorLog().forEach(log -> validationInfos.add(PipelineElementValidationInfo.error(log.toString()))); - edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.invalid(source.getDom(), t.getDom(), toNotifications(e.getErrorLog()))); + edgeValidations.put(makeKey(source, t), + PipelineEdgeValidation.invalid(source.getDom(), t.getDom(), toNotifications(e.getErrorLog()))); modification.setPipelineElementValid(false); } modification.setValidationInfos(validationInfos); @@ -105,7 +110,7 @@ private String makeKey(NamedStreamPipesEntity source, return source.getDom() + "-" + t.getDom(); } - private List toList(Map map) { + private List toList(Map map) { return new ArrayList<>(map.values()); } @@ -123,16 +128,16 @@ private void buildModification(PipelineModification modification, private Set getConnections(NamedStreamPipesEntity source) { Set outgoingEdges = pipelineGraph.outgoingEdgesOf(source); return outgoingEdges - .stream() - .map(pipelineGraph::getEdgeTarget) - .map(g -> (InvocableStreamPipesEntity) g) - .collect(Collectors.toSet()); + .stream() + .map(pipelineGraph::getEdgeTarget) + .map(g -> (InvocableStreamPipesEntity) g) + .collect(Collectors.toSet()); } private List toNotifications(List matchingResultMessages) { return matchingResultMessages - .stream() - .map(m -> new Notification(m.getTitle(), m.toString())) - .collect(Collectors.toList()); + .stream() + .map(m -> new Notification(m.getTitle(), m.toString())) + .collect(Collectors.toList()); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java index c5cb3d943a..0775550c7c 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java @@ -28,7 +28,10 @@ import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineModification; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; public class PipelineVerificationHandlerV2 { @@ -57,7 +60,8 @@ public List verifyAndBuildGraphs(boolean ignoreUnconfigu if (pipelineElement instanceof DataProcessorInvocation) { ((DataProcessorInvocation) pipelineElement).setOutputStream(modification.getOutputStream()); if (((DataProcessorInvocation) pipelineElement).getOutputStream().getEventGrounding() == null) { - EventGrounding grounding = new GroundingBuilder(pipelineElement, Collections.emptySet()).getEventGrounding(); + EventGrounding grounding = + new GroundingBuilder(pipelineElement, Collections.emptySet()).getEventGrounding(); ((DataProcessorInvocation) pipelineElement).getOutputStream().setEventGrounding(grounding); } if (modification.getOutputStrategies() != null) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java index cccd3e1fa1..48e211cb84 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java @@ -24,7 +24,11 @@ import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.base.NamedStreamPipesEntity; -import org.apache.streampipes.model.grounding.*; +import org.apache.streampipes.model.grounding.JmsTransportProtocol; +import org.apache.streampipes.model.grounding.KafkaTransportProtocol; +import org.apache.streampipes.model.grounding.MqttTransportProtocol; +import org.apache.streampipes.model.grounding.NatsTransportProtocol; +import org.apache.streampipes.model.grounding.TransportProtocol; import java.util.List; import java.util.Set; @@ -48,17 +52,17 @@ public TransportProtocol getPreferredProtocol() { .getTransportProtocol(); } else { for (SpProtocol prioritizedProtocol : prioritizedProtocols) { - if (prioritizedProtocol.getProtocolClass().equals(KafkaTransportProtocol.class.getCanonicalName()) && - supportsProtocol(KafkaTransportProtocol.class)) { + if (prioritizedProtocol.getProtocolClass().equals(KafkaTransportProtocol.class.getCanonicalName()) + && supportsProtocol(KafkaTransportProtocol.class)) { return kafkaTopic(); - } else if (prioritizedProtocol.getProtocolClass().equals(JmsTransportProtocol.class.getCanonicalName()) && - supportsProtocol(JmsTransportProtocol.class)) { + } else if (prioritizedProtocol.getProtocolClass().equals(JmsTransportProtocol.class.getCanonicalName()) + && supportsProtocol(JmsTransportProtocol.class)) { return jmsTopic(); - } else if (prioritizedProtocol.getProtocolClass().equals(MqttTransportProtocol.class.getCanonicalName()) && - supportsProtocol(MqttTransportProtocol.class)) { + } else if (prioritizedProtocol.getProtocolClass().equals(MqttTransportProtocol.class.getCanonicalName()) + && supportsProtocol(MqttTransportProtocol.class)) { return mqttTopic(); - } else if (prioritizedProtocol.getProtocolClass().equals(NatsTransportProtocol.class.getCanonicalName()) && - supportsProtocol(NatsTransportProtocol.class)) { + } else if (prioritizedProtocol.getProtocolClass().equals(NatsTransportProtocol.class.getCanonicalName()) + && supportsProtocol(NatsTransportProtocol.class)) { return natsTopic(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/EmptyRequirementsSelectorGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/EmptyRequirementsSelectorGenerator.java index 31de0c74e5..1b8d64c4b7 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/EmptyRequirementsSelectorGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/EmptyRequirementsSelectorGenerator.java @@ -33,13 +33,13 @@ public EmptyRequirementsSelectorGenerator(List inputStreams) { @Override public List generateSelectors() { List selectors = new ArrayList<>(new PropertySelectorGenerator(inputStreams - .get(0).getEventSchema().getEventProperties(), true) - .generateSelectors(PropertySelectorConstants.FIRST_STREAM_ID_PREFIX)); + .get(0).getEventSchema().getEventProperties(), true) + .generateSelectors(PropertySelectorConstants.FIRST_STREAM_ID_PREFIX)); if (inputStreams.size() > 1) { selectors.addAll(new PropertySelectorGenerator(inputStreams - .get(1).getEventSchema().getEventProperties(), true) - .generateSelectors(PropertySelectorConstants.SECOND_STREAM_ID_PREFIX)); + .get(1).getEventSchema().getEventProperties(), true) + .generateSelectors(PropertySelectorConstants.SECOND_STREAM_ID_PREFIX)); } return selectors; diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java index 53447cf054..079ccf50c6 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java @@ -44,7 +44,7 @@ public MappingPropertyCalculator(EventSchema schema, List availablePrope public List matchedPropertySelectors() { List matchedSelectors = new ArrayList<>(); - for(String propertySelector : availablePropertySelectors) { + for (String propertySelector : availablePropertySelectors) { List offeredProperties = getEventProperty(propertySelector); if (offeredProperties.size() == 1) { if (new PropertyMatch().match(offeredProperties.get(0), requirement, new ArrayList<>())) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/RequirementsSelectorGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/RequirementsSelectorGenerator.java index 5f53a4e350..13cd8095d7 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/RequirementsSelectorGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/RequirementsSelectorGenerator.java @@ -30,8 +30,7 @@ public class RequirementsSelectorGenerator extends AbstractRequirementsSelectorG private String requirementSelector; private InvocableStreamPipesEntity rootPipelineElement; - public RequirementsSelectorGenerator(List inputStreams, - InvocableStreamPipesEntity rootPipelineElement, + public RequirementsSelectorGenerator(List inputStreams, InvocableStreamPipesEntity rootPipelineElement, String requirementSelector) { super(inputStreams); this.rootPipelineElement = rootPipelineElement; @@ -40,18 +39,17 @@ public RequirementsSelectorGenerator(List inputStreams, @Override public List generateSelectors() { - PropertyRequirementSelector selector = new PropertyRequirementSelector - (requirementSelector); + PropertyRequirementSelector selector = new PropertyRequirementSelector(requirementSelector); - EventProperty propertyRequirement = selector.findPropertyRequirement - (rootPipelineElement.getStreamRequirements()); + EventProperty propertyRequirement = selector.findPropertyRequirement(rootPipelineElement.getStreamRequirements()); SpDataStream inputStream = selector.getAffectedStream(inputStreams); - List availablePropertySelectors = new PropertySelectorGenerator(inputStream - .getEventSchema(), true).generateSelectors(selector.getAffectedStreamPrefix()); + List availablePropertySelectors = + new PropertySelectorGenerator(inputStream.getEventSchema(), true).generateSelectors( + selector.getAffectedStreamPrefix()); - return new MappingPropertyCalculator(inputStream.getEventSchema(), - availablePropertySelectors, propertyRequirement).matchedPropertySelectors(); + return new MappingPropertyCalculator(inputStream.getEventSchema(), availablePropertySelectors, + propertyRequirement).matchedPropertySelectors(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/RequirementsSelectorGeneratorFactory.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/RequirementsSelectorGeneratorFactory.java index f5eaba5501..f302d7cf5a 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/RequirementsSelectorGeneratorFactory.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/RequirementsSelectorGeneratorFactory.java @@ -25,13 +25,14 @@ public class RequirementsSelectorGeneratorFactory { - public static AbstractRequirementsSelectorGenerator getRequirementsSelector(MappingProperty mappingProperty, - List inputStreams, - InvocableStreamPipesEntity rootPipelineElement) { + public static AbstractRequirementsSelectorGenerator getRequirementsSelector( + MappingProperty mappingProperty, + List inputStreams, + InvocableStreamPipesEntity rootPipelineElement) { if (!mappingProperty.getRequirementSelector().equals("")) { return new RequirementsSelectorGenerator(inputStreams, - rootPipelineElement, - mappingProperty.getRequirementSelector()); + rootPipelineElement, + mappingProperty.getRequirementSelector()); } else { return new EmptyRequirementsSelectorGenerator(inputStreams); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/AppendOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/AppendOutputSchemaGenerator.java index 70804f418a..8a271133c2 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/AppendOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/AppendOutputSchemaGenerator.java @@ -46,11 +46,11 @@ public AppendOutputSchemaGenerator(AppendOutputStrategy strategy) { @Override public Tuple2 buildFromOneStream(SpDataStream stream) { List selectors = - new PropertySelectorGenerator(stream.getEventSchema(), true).generateSelectors(); + new PropertySelectorGenerator(stream.getEventSchema(), true).generateSelectors(); Tuple2, List> generatedOutputProperties = new - PropertySelector(stream.getEventSchema()) - .createRenamedPropertyList(selectors, appendProperties); + PropertySelector(stream.getEventSchema()) + .createRenamedPropertyList(selectors, appendProperties); EventSchema outputSchema = new EventSchema(generatedOutputProperties.k); @@ -59,14 +59,14 @@ public Tuple2 buildFromOneStream(SpDataStream @Override public Tuple2 buildFromTwoStreams(SpDataStream stream1, - SpDataStream stream2) { + SpDataStream stream2) { List selectors = - new PropertySelectorGenerator(stream1.getEventSchema(), stream2.getEventSchema(), - true).generateSelectors(); + new PropertySelectorGenerator(stream1.getEventSchema(), stream2.getEventSchema(), + true).generateSelectors(); Tuple2, List> generatedOutputProperties = new - PropertySelector(stream1.getEventSchema(), stream2.getEventSchema()) - .createRenamedPropertyList(selectors, appendProperties); + PropertySelector(stream1.getEventSchema(), stream2.getEventSchema()) + .createRenamedPropertyList(selectors, appendProperties); EventSchema outputSchema = new EventSchema(generatedOutputProperties.k); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomOutputSchemaGenerator.java index 051b2bf2af..09626cafb4 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomOutputSchemaGenerator.java @@ -45,16 +45,16 @@ public CustomOutputSchemaGenerator(CustomOutputStrategy strategy) { @Override public Tuple2 buildFromOneStream(SpDataStream stream) { return new Tuple2<>(new EventSchema(new PropertySelector(stream.getEventSchema()) - .createPropertyList(selectedPropertyKeys)), outputStrategy); + .createPropertyList(selectedPropertyKeys)), outputStrategy); } @Override public Tuple2 buildFromTwoStreams(SpDataStream stream1, - SpDataStream stream2) { + SpDataStream stream2) { Tuple2, List> generatedOutputProperties = new - PropertySelector(stream1.getEventSchema(), - stream2.getEventSchema()).createRenamedPropertyList(selectedPropertyKeys); + PropertySelector(stream1.getEventSchema(), + stream2.getEventSchema()).createRenamedPropertyList(selectedPropertyKeys); EventSchema outputSchema = new EventSchema(generatedOutputProperties.k); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java index d2697c3e6e..936adabb49 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java @@ -17,10 +17,6 @@ */ package org.apache.streampipes.manager.matching.output; -import com.google.gson.JsonSyntaxException; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; -import org.apache.http.entity.ContentType; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.graph.DataProcessorInvocation; @@ -31,6 +27,11 @@ import org.apache.streampipes.serializers.json.JacksonSerializer; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; +import com.google.gson.JsonSyntaxException; +import org.apache.http.client.fluent.Request; +import org.apache.http.client.fluent.Response; +import org.apache.http.entity.ContentType; + import java.io.IOException; public class CustomTransformOutputSchemaGenerator extends OutputSchemaGenerator { @@ -55,17 +56,19 @@ public Tuple2 buildFromOneStream(SpD } @Override - public Tuple2 buildFromTwoStreams(SpDataStream stream1, SpDataStream stream2) { + public Tuple2 buildFromTwoStreams(SpDataStream stream1, + SpDataStream stream2) { return makeTuple(makeRequest()); } private EventSchema makeRequest() { try { String httpRequestBody = JacksonSerializer.getObjectMapper().writeValueAsString(dataProcessorInvocation); - String endpointUrl = new ExtensionsServiceEndpointGenerator(dataProcessorInvocation.getAppId(), SpServiceUrlProvider.DATA_PROCESSOR).getEndpointResourceUrl(); + String endpointUrl = new ExtensionsServiceEndpointGenerator(dataProcessorInvocation.getAppId(), + SpServiceUrlProvider.DATA_PROCESSOR).getEndpointResourceUrl(); Response httpResp = Request.Post(endpointUrl + "/output").bodyString(httpRequestBody, - ContentType - .APPLICATION_JSON).execute(); + ContentType + .APPLICATION_JSON).execute(); return handleResponse(httpResp); } catch (Exception e) { e.printStackTrace(); @@ -77,7 +80,7 @@ private EventSchema handleResponse(Response httpResp) throws JsonSyntaxException String resp = httpResp.returnContent().asString(); return JacksonSerializer - .getObjectMapper() - .readValue(resp, EventSchema.class); + .getObjectMapper() + .readValue(resp, EventSchema.class); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/FixedOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/FixedOutputSchemaGenerator.java index 29b265ba39..9a2add88ad 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/FixedOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/FixedOutputSchemaGenerator.java @@ -47,7 +47,7 @@ public Tuple2 buildFromOneStream(SpDataStream @Override public Tuple2 buildFromTwoStreams(SpDataStream stream1, - SpDataStream stream2) { + SpDataStream stream2) { return buildFromOneStream(stream1); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/ListOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/ListOutputSchemaGenerator.java index 5946d12f66..55212a1120 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/ListOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/ListOutputSchemaGenerator.java @@ -49,7 +49,7 @@ public Tuple2 buildFromOneStream(SpDataStream s @Override public Tuple2 buildFromTwoStreams(SpDataStream stream1, - SpDataStream stream2) { + SpDataStream stream2) { return buildFromOneStream(stream1); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/OutputSchemaFactory.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/OutputSchemaFactory.java index 2828b8af8f..1f083f3267 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/OutputSchemaFactory.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/OutputSchemaFactory.java @@ -19,7 +19,15 @@ package org.apache.streampipes.manager.matching.output; import org.apache.streampipes.model.graph.DataProcessorInvocation; -import org.apache.streampipes.model.output.*; +import org.apache.streampipes.model.output.AppendOutputStrategy; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.model.output.CustomTransformOutputStrategy; +import org.apache.streampipes.model.output.FixedOutputStrategy; +import org.apache.streampipes.model.output.KeepOutputStrategy; +import org.apache.streampipes.model.output.ListOutputStrategy; +import org.apache.streampipes.model.output.OutputStrategy; +import org.apache.streampipes.model.output.TransformOutputStrategy; +import org.apache.streampipes.model.output.UserDefinedOutputStrategy; public class OutputSchemaFactory { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/OutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/OutputSchemaGenerator.java index 01c45a28d8..1fcf94adaa 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/OutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/OutputSchemaGenerator.java @@ -34,7 +34,7 @@ public OutputSchemaGenerator(T outputStrategy) { public abstract Tuple2 buildFromOneStream(SpDataStream stream); public abstract Tuple2 buildFromTwoStreams(SpDataStream stream1, - SpDataStream stream2); + SpDataStream stream2); public Tuple2 makeTuple(EventSchema eventSchema) { return new Tuple2<>(eventSchema, outputStrategy); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/PropertyDuplicateRemover.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/PropertyDuplicateRemover.java index 8f54b6ea4f..811e11216f 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/PropertyDuplicateRemover.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/PropertyDuplicateRemover.java @@ -45,7 +45,8 @@ public List rename() { while (isAlreadyDefined(existingProperties, newProperty)) { if (newProperty instanceof EventPropertyPrimitive) { EventPropertyPrimitive primitive = (EventPropertyPrimitive) newProperty; - newProperty = new EventPropertyPrimitive(primitive.getRuntimeType(), primitive.getRuntimeName() + i, "", primitive.getDomainProperties()); + newProperty = new EventPropertyPrimitive(primitive.getRuntimeType(), primitive.getRuntimeName() + i, "", + primitive.getDomainProperties()); newProperty.setElementId(primitive.getElementId() + i); } if (newProperty instanceof EventPropertyNested) { @@ -57,7 +58,9 @@ public List rename() { for (EventProperty np : nested.getEventProperties()) { if (np instanceof EventPropertyPrimitive) { EventPropertyPrimitive thisPrimitive = (EventPropertyPrimitive) np; - EventProperty newNested = new EventPropertyPrimitive(thisPrimitive.getRuntimeType(), thisPrimitive.getRuntimeName(), "", thisPrimitive.getDomainProperties()); + EventProperty newNested = + new EventPropertyPrimitive(thisPrimitive.getRuntimeType(), thisPrimitive.getRuntimeName(), "", + thisPrimitive.getDomainProperties()); newNested.setElementId(thisPrimitive.getElementId()); nestedProperties.add(newNested); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/RenameOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/RenameOutputSchemaGenerator.java index 57c3e94a9b..138a641326 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/RenameOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/RenameOutputSchemaGenerator.java @@ -52,7 +52,7 @@ public Tuple2 buildFromTwoStreams(SpDataStream properties.addAll(stream1.getEventSchema().getEventProperties()); if (outputStrategy.isKeepBoth()) { properties.addAll(new PropertyDuplicateRemover(properties, - stream2.getEventSchema().getEventProperties()).rename()); + stream2.getEventSchema().getEventProperties()).rename()); } resultSchema.setEventProperties(properties); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java index 370740b105..711b5c18f7 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java @@ -26,17 +26,25 @@ import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventPropertyPrimitive; import org.apache.streampipes.model.schema.EventSchema; -import org.apache.streampipes.model.staticproperty.*; +import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty; +import org.apache.streampipes.model.staticproperty.MappingPropertyUnary; +import org.apache.streampipes.model.staticproperty.Option; +import org.apache.streampipes.model.staticproperty.SelectionStaticProperty; +import org.apache.streampipes.model.staticproperty.StaticProperty; import org.apache.streampipes.model.util.Cloner; import org.apache.streampipes.sdk.helpers.Tuple2; import java.net.URI; -import java.util.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; public class TransformOutputSchemaGenerator extends OutputSchemaGenerator { - private List staticProperties; + private List staticProperties; public static TransformOutputSchemaGenerator from(OutputStrategy strategy, DataProcessorInvocation invocation) { @@ -44,7 +52,7 @@ public static TransformOutputSchemaGenerator from(OutputStrategy strategy, } public TransformOutputSchemaGenerator(TransformOutputStrategy strategy, DataProcessorInvocation - invocation) { + invocation) { super(strategy); this.staticProperties = invocation.getStaticProperties(); } @@ -57,7 +65,7 @@ public Tuple2 buildFromOneStream(SpDataStr EventSchema inSchema = stream.getEventSchema(); outputStrategy.getTransformOperations().forEach(to -> { Optional mappingPropertyOpt = findMappingProperty(to.getMappingPropertyInternalName(), - staticProperties); + staticProperties); if (mappingPropertyOpt.isPresent()) { MappingPropertyUnary mappingProperty = mappingPropertyOpt.get(); @@ -65,39 +73,39 @@ public Tuple2 buildFromOneStream(SpDataStr if (selectedProperty != null) { Optional eventPropertyOpt = findEventProperty( - mappingPropertyOpt.get().getSelectedProperty(), - inSchema.getEventProperties() + mappingPropertyOpt.get().getSelectedProperty(), + inSchema.getEventProperties() ); if (eventPropertyOpt.isPresent()) { EventProperty eventProperty = eventPropertyOpt.get(); modifiedEventProperties.put(eventProperty.getElementId(), modifyEventProperty(cloneEp(eventProperty), to, - staticProperties)); + staticProperties)); } } } }); List newProperties = inSchema.getEventProperties() - .stream() - .map(ep -> modifiedEventProperties.getOrDefault(ep.getElementId(), ep)) - .collect(Collectors.toList()); + .stream() + .map(ep -> modifiedEventProperties.getOrDefault(ep.getElementId(), ep)) + .collect(Collectors.toList()); outSchema.setEventProperties(newProperties); return makeTuple(outSchema); } private EventProperty modifyEventProperty(EventProperty eventProperty, TransformOperation to, List - staticProperties) { + staticProperties) { if (to.getTargetValue() != null) { return modifyEventProperty(eventProperty, TransformOperationType.valueOf(to.getTransformationScope()), to - .getTargetValue()); + .getTargetValue()); } else { Optional sp = findStaticProperty(staticProperties, to.getSourceStaticProperty()); if (sp.isPresent()) { return modifyEventProperty(eventProperty, sp.get(), TransformOperationType.valueOf(to.getTransformationScope - ())); + ())); } } return new Cloner().property(eventProperty); @@ -105,14 +113,14 @@ private EventProperty modifyEventProperty(EventProperty eventProperty, Transform private EventProperty modifyEventProperty(EventProperty eventProperty, StaticProperty staticProperty, TransformOperationType - transformOperationType) { + transformOperationType) { if (staticProperty instanceof SelectionStaticProperty) { return modifyEventProperty(eventProperty, transformOperationType, findSelected(((SelectionStaticProperty) - staticProperty).getOptions()).getInternalName()); + staticProperty).getOptions()).getInternalName()); } else if (staticProperty instanceof FreeTextStaticProperty) { return modifyEventProperty(eventProperty, transformOperationType, ((FreeTextStaticProperty) staticProperty) - .getValue - ()); + .getValue + ()); } return eventProperty; @@ -120,23 +128,24 @@ private EventProperty modifyEventProperty(EventProperty eventProperty, StaticPro private Option findSelected(List