Skip to content

Commit

Permalink
Harmonize REST endpoints for retrieval of pipeline elements (#1333)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Feb 21, 2023
1 parent e4bcf07 commit b45645a
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ public DataProcessorApi(StreamPipesClientConfig clientConfig) {
@Override
protected StreamPipesApiPath getBaseResourcePath() {
return StreamPipesApiPath.fromBaseApiPath()
.addToPath("sepas")
.addToPath("own");
.addToPath("sepas");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public DataSinkInvocation getDataSinkForPipelineElement(String templateId, DataS
@Override
protected StreamPipesApiPath getBaseResourcePath() {
return StreamPipesApiPath.fromBaseApiPath()
.addToPath("actions")
.addToPath("own");
.addToPath("actions");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public SpKafkaConsumer subscribe(SpDataStream stream,
@Override
protected StreamPipesApiPath getBaseResourcePath() {
return StreamPipesApiPath.fromBaseApiPath()
.addToPath("streams")
.addToPath("own");
.addToPath("streams");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.streampipes.storage.api.CRUDStorage;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public abstract class AbstractPipelineElementResourceManager<T extends CRUDStorage<String, W>,
Expand All @@ -43,6 +44,7 @@ public List<String> findAllIdsOnly() {
public List<X> findAllAsInvocation() {
return findAll()
.stream()
.filter(Objects::nonNull)
.map(this::toInvocation)
.collect(Collectors.toList());
}
Expand All @@ -52,7 +54,12 @@ public W find(String elementId) {
}

public X findAsInvocation(String elementId) {
return toInvocation(find(elementId));
var element = find(elementId);
if (Objects.nonNull(element)) {
return toInvocation(find(elementId));
} else {
throw new IllegalArgumentException("Could not find element with id " + elementId);
}
}

public void delete(String elementId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.streampipes.rest.impl.pe;

import org.apache.streampipes.model.StreamPipesErrorMessage;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.message.NotificationType;
Expand Down Expand Up @@ -55,7 +56,6 @@ public List<DataProcessorDescription> getAvailable() {
}

@GET
@Path("/own")
@JacksonSerialized
@Produces({MediaType.APPLICATION_JSON})
@PreAuthorize(AuthConstants.HAS_READ_PIPELINE_ELEMENT_PRIVILEGE)
Expand All @@ -65,7 +65,7 @@ public List<DataProcessorInvocation> getOwn() {
}

@DELETE
@Path("/own/{elementId}")
@Path("/{elementId}")
@Produces(MediaType.APPLICATION_JSON)
@JacksonSerialized
@PreAuthorize(AuthConstants.HAS_DELETE_PIPELINE_ELEMENT_PRIVILEGE)
Expand All @@ -79,8 +79,12 @@ public Response removeOwn(@PathParam("elementId") String elementId) {
@Produces(MediaType.APPLICATION_JSON)
@JacksonSerialized
@PreAuthorize(AuthConstants.HAS_READ_PIPELINE_ELEMENT_PRIVILEGE)
public DataProcessorInvocation getElement(@PathParam("elementId") String elementId) {
return getDataProcessorResourceManager().findAsInvocation(elementId);
public Response getElement(@PathParam("elementId") String elementId) {
try {
return ok(getDataProcessorResourceManager().findAsInvocation(elementId));
} catch (IllegalArgumentException e) {
return badRequest(StreamPipesErrorMessage.from(e));
}
}

private DataProcessorResourceManager getDataProcessorResourceManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.streampipes.rest.impl.pe;

import org.apache.streampipes.model.StreamPipesErrorMessage;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.message.NotificationType;
Expand Down Expand Up @@ -56,7 +57,6 @@ public List<DataSinkDescription> getAvailable() {
}

@GET
@Path("/own")
@Produces({MediaType.APPLICATION_JSON, SpMediaType.JSONLD})
@JacksonSerialized
@PreAuthorize(AuthConstants.HAS_READ_PIPELINE_ELEMENT_PRIVILEGE)
Expand All @@ -66,7 +66,7 @@ public List<DataSinkInvocation> getOwn() {
}

@DELETE
@Path("/own/{elementId}")
@Path("/{elementId}")
@Produces(MediaType.APPLICATION_JSON)
@JacksonSerialized
@PreAuthorize(AuthConstants.HAS_DELETE_PIPELINE_ELEMENT_PRIVILEGE)
Expand All @@ -80,8 +80,12 @@ public Response removeOwn(@PathParam("elementId") String elementId) {
@Produces(MediaType.APPLICATION_JSON)
@JacksonSerialized
@PreAuthorize(AuthConstants.HAS_READ_PIPELINE_ELEMENT_PRIVILEGE)
public DataSinkInvocation getElement(@PathParam("elementId") String elementId) {
return getDataSinkResourceManager().findAsInvocation(elementId);
public Response getElement(@PathParam("elementId") String elementId) {
try {
return ok(getDataSinkResourceManager().findAsInvocation(elementId));
} catch (IllegalArgumentException e) {
return badRequest(StreamPipesErrorMessage.from(e));
}
}

private DataSinkResourceManager getDataSinkResourceManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.streampipes.rest.impl.pe;

import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.StreamPipesErrorMessage;
import org.apache.streampipes.model.message.NotificationType;
import org.apache.streampipes.resource.management.DataStreamResourceManager;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
Expand Down Expand Up @@ -80,8 +81,12 @@ public Response delete(@PathParam("elementId") String elementId) {
@Produces(MediaType.APPLICATION_JSON)
@JacksonSerialized
@PreAuthorize(AuthConstants.HAS_READ_PIPELINE_ELEMENT_PRIVILEGE)
public SpDataStream getElement(@PathParam("elementId") String elementId) {
return getDataStreamResourceManager().findAsInvocation(elementId);
public Response getElement(@PathParam("elementId") String elementId) {
try {
return ok(getDataStreamResourceManager().findAsInvocation(elementId));
} catch (IllegalArgumentException e) {
return badRequest(StreamPipesErrorMessage.from(e));
}
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class PipelineElementService {
) {}

getDataProcessors(): Observable<DataProcessorInvocation[]> {
return this.http.get(this.dataProcessorsUrl + '/own').pipe(
return this.http.get(this.dataProcessorsUrl).pipe(
map(data => {
return (data as []).map(dpi =>
DataProcessorInvocation.fromData(dpi),
Expand All @@ -48,7 +48,7 @@ export class PipelineElementService {
}

getDataSinks(): Observable<DataSinkInvocation[]> {
return this.http.get(this.dataSinksUrl + '/own').pipe(
return this.http.get(this.dataSinksUrl).pipe(
map(data => {
return (data as []).map(dpi =>
DataSinkInvocation.fromData(dpi),
Expand All @@ -58,7 +58,7 @@ export class PipelineElementService {
}

getDataStreams(): Observable<SpDataStream[]> {
return this.http.get(this.dataStreamsUrl + '/own').pipe(
return this.http.get(this.dataStreamsUrl).pipe(
map(data => {
return (data as []).map(dpi => {
if (
Expand Down

0 comments on commit b45645a

Please sign in to comment.