Skip to content

Commit

Permalink
[#820] Activate checkstyle for streampipes-pipeline-management
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe committed Dec 15, 2022
1 parent 5581d90 commit a581707
Show file tree
Hide file tree
Showing 147 changed files with 2,203 additions and 1,969 deletions.
9 changes: 9 additions & 0 deletions streampipes-pipeline-management/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,13 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ private void replaceImagePaths() {

private String makeAssetLocation(String appId) {
return AssetConstants.ASSET_BASE_DIR
+ File.separator + appId;
+ File.separator + appId;
}

private String makeDocumentationAssetPath(String appId) {
return makeAssetLocation(appId) + File.separator + GlobalStreamPipesConstants
.STD_DOCUMENTATION_NAME;
.STD_DOCUMENTATION_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.streampipes.manager.assets;

import org.apache.http.client.fluent.Request;
import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import org.apache.http.client.fluent.Request;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -41,10 +42,10 @@ public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider,
public InputStream fetchPipelineElementAssets() throws IOException, NoServiceEndpointsAvailableException {
String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, spServiceUrlProvider).getEndpointResourceUrl();
return Request
.Get(endpointUrl + ASSET_ENDPOINT_APPENDIX)
.execute()
.returnContent()
.asStream();
.Get(endpointUrl + ASSET_ENDPOINT_APPENDIX)
.execute()
.returnContent()
.asStream();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
*/
package org.apache.streampipes.manager.assets;

import org.apache.commons.io.FileUtils;
import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -46,7 +47,7 @@ public static byte[] getAsset(String appId, String assetName) throws IOException
public static void storeAsset(SpServiceUrlProvider spServiceUrlProvider,
String appId) throws IOException, NoServiceEndpointsAvailableException {
InputStream assetStream = new AssetFetcher(spServiceUrlProvider, appId)
.fetchPipelineElementAssets();
.fetchPipelineElementAssets();
new AssetExtractor(assetStream, appId).extractAssetContents();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.streampipes.manager.data;

import org.apache.streampipes.model.base.NamedStreamPipesEntity;

import org.jgrapht.graph.DirectedMultigraph;

public class PipelineGraph extends DirectedMultigraph<NamedStreamPipesEntity, String> {

public PipelineGraph() {
super(String.class);
}
public PipelineGraph() {
super(String.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,52 @@

public class PipelineGraphBuilder {

private final Pipeline pipeline;
private final List<NamedStreamPipesEntity> allPipelineElements;
private final List<InvocableStreamPipesEntity> invocableElements;
private final Pipeline pipeline;
private final List<NamedStreamPipesEntity> allPipelineElements;
private final List<InvocableStreamPipesEntity> invocableElements;

public PipelineGraphBuilder(Pipeline pipeline) {
this.pipeline = pipeline;
this.allPipelineElements = addAll();
this.invocableElements = addInvocable();
}

private List<NamedStreamPipesEntity> addAll() {
List<NamedStreamPipesEntity> allElements = new ArrayList<>();
allElements.addAll(pipeline.getStreams());
allElements.addAll(addInvocable());
return allElements;
}
public PipelineGraphBuilder(Pipeline pipeline) {
this.pipeline = pipeline;
this.allPipelineElements = addAll();
this.invocableElements = addInvocable();
}

private List<InvocableStreamPipesEntity> addInvocable() {
List<InvocableStreamPipesEntity> allElements = new ArrayList<>();
allElements.addAll(pipeline.getSepas());
allElements.addAll(pipeline.getActions());
return allElements;
}
private List<NamedStreamPipesEntity> addAll() {
List<NamedStreamPipesEntity> allElements = new ArrayList<>();
allElements.addAll(pipeline.getStreams());
allElements.addAll(addInvocable());
return allElements;
}

private List<InvocableStreamPipesEntity> addInvocable() {
List<InvocableStreamPipesEntity> allElements = new ArrayList<>();
allElements.addAll(pipeline.getSepas());
allElements.addAll(pipeline.getActions());
return allElements;
}

public PipelineGraph buildGraph() {
PipelineGraph pipelineGraph = new PipelineGraph();
allPipelineElements.forEach(pipelineGraph::addVertex);

for(NamedStreamPipesEntity source : allPipelineElements) {
List<InvocableStreamPipesEntity> targets = findTargets(source.getDom());
targets.forEach(t -> pipelineGraph.addEdge(source, t, createEdge(source, t)));
}
public PipelineGraph buildGraph() {
PipelineGraph pipelineGraph = new PipelineGraph();
allPipelineElements.forEach(pipelineGraph::addVertex);

return pipelineGraph;
for (NamedStreamPipesEntity source : allPipelineElements) {
List<InvocableStreamPipesEntity> targets = findTargets(source.getDom());
targets.forEach(t -> pipelineGraph.addEdge(source, t, createEdge(source, t)));
}

private List<InvocableStreamPipesEntity> findTargets(String domId) {
return invocableElements
.stream()
.filter(i -> i.getConnectedTo().contains(domId))
.collect(Collectors.toList());
}
return pipelineGraph;
}

private String createEdge(NamedStreamPipesEntity sourceVertex,
NamedStreamPipesEntity targetVertex) {
return sourceVertex.getDom() + "-" + targetVertex.getDom();
}
private List<InvocableStreamPipesEntity> findTargets(String domId) {
return invocableElements
.stream()
.filter(i -> i.getConnectedTo().contains(domId))
.collect(Collectors.toList());
}

private String createEdge(NamedStreamPipesEntity sourceVertex,
NamedStreamPipesEntity targetVertex) {
return sourceVertex.getDom() + "-" + targetVertex.getDom();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@

public class PipelineGraphHelpers {

public static List<SpDataStream> findStreams(PipelineGraph pipelineGraph) {
return find(pipelineGraph, SpDataStream.class);
}
public static List<SpDataStream> findStreams(PipelineGraph pipelineGraph) {
return find(pipelineGraph, SpDataStream.class);
}

public static List<InvocableStreamPipesEntity> findInvocableElements(PipelineGraph pipelineGraph) {
return find(pipelineGraph, InvocableStreamPipesEntity.class);
}
public static List<InvocableStreamPipesEntity> findInvocableElements(PipelineGraph pipelineGraph) {
return find(pipelineGraph, InvocableStreamPipesEntity.class);
}

private static <T> List<T> find(PipelineGraph pipelineGraph, Class<T> clazz) {
return pipelineGraph
.vertexSet()
.stream()
.filter(clazz::isInstance)
.map(clazz::cast)
.collect(Collectors.toList());
}
private static <T> List<T> find(PipelineGraph pipelineGraph, Class<T> clazz) {
return pipelineGraph
.vertexSet()
.stream()
.filter(clazz::isInstance)
.map(clazz::cast)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,52 @@

package org.apache.streampipes.manager.endpoint;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.http.client.fluent.Request;
import org.apache.http.message.BasicHeader;
import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
import org.apache.streampipes.serializers.json.JacksonSerializer;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.http.client.fluent.Request;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.MediaType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class EndpointItemFetcher {
Logger logger = LoggerFactory.getLogger(EndpointItemFetcher.class);
Logger logger = LoggerFactory.getLogger(EndpointItemFetcher.class);

private List<ExtensionsServiceEndpoint> extensionsServiceEndpoints;
private List<ExtensionsServiceEndpoint> extensionsServiceEndpoints;

public EndpointItemFetcher(List<ExtensionsServiceEndpoint> extensionsServiceEndpoints) {
this.extensionsServiceEndpoints = extensionsServiceEndpoints;
}
public EndpointItemFetcher(List<ExtensionsServiceEndpoint> extensionsServiceEndpoints) {
this.extensionsServiceEndpoints = extensionsServiceEndpoints;
}

public List<ExtensionsServiceEndpointItem> getItems() {
List<ExtensionsServiceEndpointItem> endpointItems = new ArrayList<>();
extensionsServiceEndpoints.forEach(e -> endpointItems.addAll(getEndpointItems(e)));
return endpointItems;
}
public List<ExtensionsServiceEndpointItem> getItems() {
List<ExtensionsServiceEndpointItem> endpointItems = new ArrayList<>();
extensionsServiceEndpoints.forEach(e -> endpointItems.addAll(getEndpointItems(e)));
return endpointItems;
}

private List<ExtensionsServiceEndpointItem> getEndpointItems(ExtensionsServiceEndpoint e) {
try {
String result = Request.Get(e.getEndpointUrl())
.addHeader(new BasicHeader("Accept", MediaType.APPLICATION_JSON))
.connectTimeout(1000)
.execute()
.returnContent()
.asString();
private List<ExtensionsServiceEndpointItem> getEndpointItems(ExtensionsServiceEndpoint e) {
try {
String result = Request.Get(e.getEndpointUrl())
.addHeader(new BasicHeader("Accept", MediaType.APPLICATION_JSON))
.connectTimeout(1000)
.execute()
.returnContent()
.asString();

return JacksonSerializer.getObjectMapper().readValue(result, new TypeReference<List<ExtensionsServiceEndpointItem>>() {});
} catch (IOException e1) {
logger.warn("Processing Element Descriptions could not be fetched from RDF endpoint: " + e.getEndpointUrl());
return new ArrayList<>();
}
return JacksonSerializer.getObjectMapper()
.readValue(result, new TypeReference<List<ExtensionsServiceEndpointItem>>() {
});
} catch (IOException e1) {
logger.warn("Processing Element Descriptions could not be fetched from RDF endpoint: " + e.getEndpointUrl());
return new ArrayList<>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
*/
package org.apache.streampipes.manager.endpoint;

import org.apache.http.client.fluent.Request;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.NotificationType;
import org.apache.streampipes.model.message.Notifications;

import org.apache.http.client.fluent.Request;

import javax.ws.rs.core.MediaType;

import java.io.IOException;
import java.net.URLDecoder;

Expand All @@ -45,10 +47,10 @@ public Message parseAndAddEndpointItem(String url,

private String parseURIContent(String url) throws IOException {
return Request
.Get(url)
.addHeader("Accept", MediaType.APPLICATION_JSON)
.execute()
.returnContent()
.asString();
.Get(url)
.addHeader("Accept", MediaType.APPLICATION_JSON)
.execute()
.returnContent()
.asString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,16 +57,18 @@ public String getEndpointBaseUrl() throws NoServiceEndpointsAvailableException {

private List<String> getServiceEndpoints() {
return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
Collections.singletonList(this.spServiceUrlProvider.getServiceTag(appId).asString()));
Collections.singletonList(this.spServiceUrlProvider.getServiceTag(appId).asString()));
}

private String selectService() throws NoServiceEndpointsAvailableException {
List<String> serviceEndpoints = getServiceEndpoints();
if (serviceEndpoints.size() > 0) {
return getServiceEndpoints().get(0);
} else {
LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, this.spServiceUrlProvider.getServiceTag(appId).asString());
throw new NoServiceEndpointsAvailableException("Could not find any matching service endpoints - are all software components running?");
LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId,
this.spServiceUrlProvider.getServiceTag(appId).asString());
throw new NoServiceEndpointsAvailableException(
"Could not find any matching service endpoints - are all software components running?");
}
}
}
Loading

0 comments on commit a581707

Please sign in to comment.