Skip to content

Commit

Permalink
Merge pull request #1078 from apache/SP-1077
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe authored Jan 11, 2023
2 parents 1c80ccd + 71e5bca commit 0b1f23f
Show file tree
Hide file tree
Showing 7 changed files with 0 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventSchema;

import java.util.List;
import java.util.Map;

public interface IProtocol extends Connector {

IProtocol getInstance(ProtocolDescription protocolDescription,
Expand All @@ -36,8 +33,6 @@ IProtocol getInstance(ProtocolDescription protocolDescription,

GuessSchema getGuessSchema() throws ParseException;

List<Map<String, Object>> getNElements(int n) throws ParseException;

void run(IAdapterPipeline adapterPipeline) throws AdapterException;

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class FileProtocol extends Protocol {

Expand Down Expand Up @@ -134,32 +132,6 @@ public GuessSchema getGuessSchema() throws ParseException {
}


@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
List<Map<String, Object>> result = new ArrayList<>();

List<byte[]> dataByteArray = new ArrayList<>();
try {
InputStream dataInputStream = FileProtocolUtils.getFileInputStream(this.selectedFilename);
dataByteArray = parser.parseNEvents(dataInputStream, n);
} catch (FileNotFoundException e) {
e.printStackTrace();
}

// Check that result size is n. Currently just an error is logged. Maybe change to an exception
if (dataByteArray.size() < n) {
logger.error("Error in File Protocol! User required: " + n + " elements but the resource just had: "
+ dataByteArray.size());
}

for (byte[] b : dataByteArray) {
result.add(format.parse(b));
}

return result;
}


@Override
public String getId() {
return ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class HttpProtocol extends Protocol {

Expand Down Expand Up @@ -122,27 +120,6 @@ public GuessSchema getGuessSchema() throws ParseException {
return result;
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {

List<Map<String, Object>> result = new ArrayList<>();

InputStream dataInputStream = getDataFromEndpoint();

List<byte[]> dataByteArray = parser.parseNEvents(dataInputStream, n);

// Check that result size is n. Currently just an error is logged. Maybe change to an exception
if (dataByteArray.size() < n) {
logger.error("Error in HttpProtocol! User required: " + n + " elements but the resource just had: "
+ dataByteArray.size());
}

for (byte[] b : dataByteArray) {
result.add(format.parse(b));
}

return result;
}

public InputStream getDataFromEndpoint() throws ParseException {
InputStream result = null;
Expand All @@ -153,12 +130,6 @@ public InputStream getDataFromEndpoint() throws ParseException {
.socketTimeout(100000)
.execute().returnContent().asStream();

// if (s.startsWith("ï")) {
// s = s.substring(3);
// }

// result = IOUtils.toInputStream(s, "UTF-8");

} catch (IOException e) {
throw new ParseException("Could not receive Data from: " + url);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import org.apache.streampipes.extensions.management.connect.adapter.model.generic.Protocol;
import org.apache.streampipes.model.connect.guess.GuessSchema;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public abstract class BrokerProtocol extends Protocol {

Expand Down Expand Up @@ -55,17 +53,6 @@ public GuessSchema getGuessSchema() throws ParseException {
}
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
List<byte[]> resultEventsByte = getNByteElements(n);
List<Map<String, Object>> result = new ArrayList<>();
for (byte[] event : resultEventsByte) {
result.add(format.parse(event));
}

return result;
}

protected abstract List<byte[]> getNByteElements(int n) throws ParseException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class FileStreamProtocol extends Protocol {

private static Logger logger = LoggerFactory.getLogger(FileStreamProtocol.class);

public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.file";

//private String filePath;
private String selectedFileName;
// private String timestampKey;
private boolean replaceTimestamp;
private float speedUp;
private int timeBetweenReplay;
Expand Down Expand Up @@ -165,7 +162,6 @@ public Protocol getInstance(ProtocolDescription protocolDescription, IParser par
StaticPropertyExtractor.from(protocolDescription.getConfig(), new ArrayList<>());

List<String> replaceTimestampStringList = extractor.selectedMultiValues("replaceTimestamp", String.class);
// String replaceTimestampString = extractor.selectedSingleValueOption("replaceTimestamp");
boolean replaceTimestamp = replaceTimestampStringList.size() != 0;

float speedUp = extractor.singleValueParameter("speed", Float.class);
Expand Down Expand Up @@ -209,8 +205,6 @@ public ProtocolDescription declareModel() {
.sourceType(AdapterSourceType.STREAM)
.category(AdapterType.Generic)
.requiredFile(Labels.withId("filePath"), Filetypes.CSV, Filetypes.JSON, Filetypes.XML)
// .requiredSingleValueSelection(Labels.withId("replaceTimestamp"),
// Options.from("True", "False"))
.requiredMultiValueSelection(Labels.withId("replaceTimestamp"),
Options.from(""))
.requiredFloatParameter(Labels.withId("speed"))
Expand All @@ -231,27 +225,6 @@ public GuessSchema getGuessSchema() throws ParseException {
}
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
List<Map<String, Object>> result = new ArrayList<>();

InputStream dataInputStream = getDataFromEndpoint();

List<byte[]> dataByteArray = parser.parseNEvents(dataInputStream, n);

// Check that result size is n. Currently, just an error is logged. Maybe change to an exception
if (dataByteArray.size() < n) {
logger.error("Error in File Protocol! User required: " + n + " elements but the resource just had: "
+ dataByteArray.size());
}

for (byte[] b : dataByteArray) {
result.add(format.parse(b));
}

return result;
}


@Override
public String getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class HttpServerProtocol extends Protocol {

Expand Down Expand Up @@ -153,11 +151,6 @@ private String extractRuntimeType(String type) {
}
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
return null;
}

@Override
public void run(IAdapterPipeline adapterPipeline) {
SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class HttpStreamProtocol extends PullProtocol {

Expand All @@ -51,7 +49,6 @@ public class HttpStreamProtocol extends PullProtocol {

private static final String URL_PROPERTY = "url";
private static final String INTERVAL_PROPERTY = "interval";
private static final String ACCESS_TOKEN_PROPERTY = "access_token";

private String url;
private String accessToken;
Expand Down Expand Up @@ -93,8 +90,6 @@ public ProtocolDescription declareModel() {
.category(AdapterType.Generic)
.requiredTextParameter(Labels.withId(URL_PROPERTY))
.requiredIntegerParameter(Labels.withId(INTERVAL_PROPERTY))
//.requiredTextParameter(Labels.from(ACCESS_TOKEN_PROPERTY, "Access Token", "Http
// Access Token"))
.build();
}

Expand All @@ -117,28 +112,6 @@ public GuessSchema getGuessSchema() throws ParseException {
return result;
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
List<Map<String, Object>> result = new ArrayList<>();

InputStream dataInputStream = getDataFromEndpoint();

List<byte[]> dataByte = parser.parseNEvents(dataInputStream, n);

// Check that result size is n. Currently just an error is logged. Maybe change to an exception
if (dataByte.size() < n) {
logger.error("Error in HttpStreamProtocol! User required: " + n + " elements but the resource just had: "
+ dataByte.size());
}

for (byte[] b : dataByte) {
result.add(format.parse(b));
}

return result;
}


@Override
public String getId() {
return ID;
Expand All @@ -160,16 +133,9 @@ public InputStream getDataFromEndpoint() throws ParseException {
result = request
.execute().returnContent().asStream();

// if (s.startsWith("ï")) {
// s = s.substring(3);
// }

// result = IOUtils.toInputStream(s, "UTF-8");

} catch (Exception e) {
logger.error("Error while fetching data from URL: " + url, e);
throw new ParseException("Error while fetching data from URL: " + url);
// throw new AdapterException();
}
if (result == null) {
throw new ParseException("Could not receive Data from file: " + url);
Expand Down

0 comments on commit 0b1f23f

Please sign in to comment.