Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1077] Remove legacy method getNElements #1078

Merged
merged 1 commit into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventSchema;

import java.util.List;
import java.util.Map;

public interface IProtocol extends Connector {

IProtocol getInstance(ProtocolDescription protocolDescription,
Expand All @@ -36,8 +33,6 @@ IProtocol getInstance(ProtocolDescription protocolDescription,

GuessSchema getGuessSchema() throws ParseException;

List<Map<String, Object>> getNElements(int n) throws ParseException;

void run(IAdapterPipeline adapterPipeline) throws AdapterException;

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class FileProtocol extends Protocol {

Expand Down Expand Up @@ -134,32 +132,6 @@ public GuessSchema getGuessSchema() throws ParseException {
}


@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
List<Map<String, Object>> result = new ArrayList<>();

List<byte[]> dataByteArray = new ArrayList<>();
try {
InputStream dataInputStream = FileProtocolUtils.getFileInputStream(this.selectedFilename);
dataByteArray = parser.parseNEvents(dataInputStream, n);
} catch (FileNotFoundException e) {
e.printStackTrace();
}

// Check that result size is n. Currently just an error is logged. Maybe change to an exception
if (dataByteArray.size() < n) {
logger.error("Error in File Protocol! User required: " + n + " elements but the resource just had: "
+ dataByteArray.size());
}

for (byte[] b : dataByteArray) {
result.add(format.parse(b));
}

return result;
}


@Override
public String getId() {
return ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class HttpProtocol extends Protocol {

Expand Down Expand Up @@ -122,27 +120,6 @@ public GuessSchema getGuessSchema() throws ParseException {
return result;
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {

List<Map<String, Object>> result = new ArrayList<>();

InputStream dataInputStream = getDataFromEndpoint();

List<byte[]> dataByteArray = parser.parseNEvents(dataInputStream, n);

// Check that result size is n. Currently just an error is logged. Maybe change to an exception
if (dataByteArray.size() < n) {
logger.error("Error in HttpProtocol! User required: " + n + " elements but the resource just had: "
+ dataByteArray.size());
}

for (byte[] b : dataByteArray) {
result.add(format.parse(b));
}

return result;
}

public InputStream getDataFromEndpoint() throws ParseException {
InputStream result = null;
Expand All @@ -153,12 +130,6 @@ public InputStream getDataFromEndpoint() throws ParseException {
.socketTimeout(100000)
.execute().returnContent().asStream();

// if (s.startsWith("ï")) {
// s = s.substring(3);
// }

// result = IOUtils.toInputStream(s, "UTF-8");

} catch (IOException e) {
throw new ParseException("Could not receive Data from: " + url);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import org.apache.streampipes.extensions.management.connect.adapter.model.generic.Protocol;
import org.apache.streampipes.model.connect.guess.GuessSchema;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public abstract class BrokerProtocol extends Protocol {

Expand Down Expand Up @@ -55,17 +53,6 @@ public GuessSchema getGuessSchema() throws ParseException {
}
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
List<byte[]> resultEventsByte = getNByteElements(n);
List<Map<String, Object>> result = new ArrayList<>();
for (byte[] event : resultEventsByte) {
result.add(format.parse(event));
}

return result;
}

protected abstract List<byte[]> getNByteElements(int n) throws ParseException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class FileStreamProtocol extends Protocol {

private static Logger logger = LoggerFactory.getLogger(FileStreamProtocol.class);

public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.file";

//private String filePath;
private String selectedFileName;
// private String timestampKey;
private boolean replaceTimestamp;
private float speedUp;
private int timeBetweenReplay;
Expand Down Expand Up @@ -165,7 +162,6 @@ public Protocol getInstance(ProtocolDescription protocolDescription, IParser par
StaticPropertyExtractor.from(protocolDescription.getConfig(), new ArrayList<>());

List<String> replaceTimestampStringList = extractor.selectedMultiValues("replaceTimestamp", String.class);
// String replaceTimestampString = extractor.selectedSingleValueOption("replaceTimestamp");
boolean replaceTimestamp = replaceTimestampStringList.size() != 0;

float speedUp = extractor.singleValueParameter("speed", Float.class);
Expand Down Expand Up @@ -209,8 +205,6 @@ public ProtocolDescription declareModel() {
.sourceType(AdapterSourceType.STREAM)
.category(AdapterType.Generic)
.requiredFile(Labels.withId("filePath"), Filetypes.CSV, Filetypes.JSON, Filetypes.XML)
// .requiredSingleValueSelection(Labels.withId("replaceTimestamp"),
// Options.from("True", "False"))
.requiredMultiValueSelection(Labels.withId("replaceTimestamp"),
Options.from(""))
.requiredFloatParameter(Labels.withId("speed"))
Expand All @@ -231,27 +225,6 @@ public GuessSchema getGuessSchema() throws ParseException {
}
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
List<Map<String, Object>> result = new ArrayList<>();

InputStream dataInputStream = getDataFromEndpoint();

List<byte[]> dataByteArray = parser.parseNEvents(dataInputStream, n);

// Check that result size is n. Currently, just an error is logged. Maybe change to an exception
if (dataByteArray.size() < n) {
logger.error("Error in File Protocol! User required: " + n + " elements but the resource just had: "
+ dataByteArray.size());
}

for (byte[] b : dataByteArray) {
result.add(format.parse(b));
}

return result;
}


@Override
public String getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class HttpServerProtocol extends Protocol {

Expand Down Expand Up @@ -153,11 +151,6 @@ private String extractRuntimeType(String type) {
}
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
return null;
}

@Override
public void run(IAdapterPipeline adapterPipeline) {
SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class HttpStreamProtocol extends PullProtocol {

Expand All @@ -51,7 +49,6 @@ public class HttpStreamProtocol extends PullProtocol {

private static final String URL_PROPERTY = "url";
private static final String INTERVAL_PROPERTY = "interval";
private static final String ACCESS_TOKEN_PROPERTY = "access_token";

private String url;
private String accessToken;
Expand Down Expand Up @@ -93,8 +90,6 @@ public ProtocolDescription declareModel() {
.category(AdapterType.Generic)
.requiredTextParameter(Labels.withId(URL_PROPERTY))
.requiredIntegerParameter(Labels.withId(INTERVAL_PROPERTY))
//.requiredTextParameter(Labels.from(ACCESS_TOKEN_PROPERTY, "Access Token", "Http
// Access Token"))
.build();
}

Expand All @@ -117,28 +112,6 @@ public GuessSchema getGuessSchema() throws ParseException {
return result;
}

@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
List<Map<String, Object>> result = new ArrayList<>();

InputStream dataInputStream = getDataFromEndpoint();

List<byte[]> dataByte = parser.parseNEvents(dataInputStream, n);

// Check that result size is n. Currently just an error is logged. Maybe change to an exception
if (dataByte.size() < n) {
logger.error("Error in HttpStreamProtocol! User required: " + n + " elements but the resource just had: "
+ dataByte.size());
}

for (byte[] b : dataByte) {
result.add(format.parse(b));
}

return result;
}


@Override
public String getId() {
return ID;
Expand All @@ -160,16 +133,9 @@ public InputStream getDataFromEndpoint() throws ParseException {
result = request
.execute().returnContent().asStream();

// if (s.startsWith("ï")) {
// s = s.substring(3);
// }

// result = IOUtils.toInputStream(s, "UTF-8");

} catch (Exception e) {
logger.error("Error while fetching data from URL: " + url, e);
throw new ParseException("Error while fetching data from URL: " + url);
// throw new AdapterException();
}
if (result == null) {
throw new ParseException("Could not receive Data from file: " + url);
Expand Down