diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index f25bf01892..0f327c9152 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -31,6 +31,7 @@ build:
artifacts:
paths:
- ./streampipes-backend/target/*.war
+ - ./streampipes-connect-container/target/*.jar
- ./target/site/apidocs/**
- ./target/mvn_version
expire_in: 1 week
@@ -69,6 +70,21 @@ docker-backend:
only:
- dev
+docker-connect-container:
+ image: docker:17.06.0-ce
+ stage: docker
+ dependencies:
+ - build
+ script:
+ - export MVN_VERSION=$(cat ./target/mvn_version)
+ - docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $REGISTRY_HOST
+ - docker login -u riemer -p $HARBOR_PASSWORD laus.fzi.de:8201
+ - docker build --pull -t $IMAGE_NAME/streampipes-connect-container:latest -t $IMAGE_NAME/streampipes-connect-container:$MVN_VERSION -t $HARBOR_IMAGE_NAME/streampipes-connect-container:latest -t $HARBOR_IMAGE_NAME/streampipes-connect-container:$MVN_VERSION ./streampipes-connect-container/
+ - docker push $IMAGE_NAME/streampipes-connect-container:$MVN_VERSION
+ - docker push $IMAGE_NAME/streampipes-connect-container:latest
+ - docker push $HARBOR_IMAGE_NAME/streampipes-connect-container:$MVN_VERSION
+ - docker push $HARBOR_IMAGE_NAME/streampipes-connect-container:latest
+
deploy:
image: maven:3-jdk-8
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000000..99c51136ff
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,38 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
+and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+
+
+## [0.60.0] - 2018-11-14
+### Added
+- Beta release of StreamPipes Connect Library
+- Tutorials for better user guidance
+- New Wrapper for the Siddhi CEP engine
+- New Project streampipes-pipeline-elements contains more than 40 new pipeline elements
+
+### Changed
+- Various bug fixes and stability improvements
+- Many UX improvements (e.g., harmonized styles)
+- Dashboard does not reload after new visualization type has been created
+- Improved test coverage
+
+### Removed
+
+## [0.55.2] - 2018-05-08
+### Added
+- The [installer](https://www.github.com/streampipes/streampipes-installer) makes it easy to install StreamPipes on Linux, MacOS and Windows
+- Live data preview for data streams in the pipeline editor
+- Initial support for data sets
+- Default for configurations can now be provided as environment variable, with the same name
+
+### Changed
+- Pipeline elements can be directly installed at installation time
+- Extended the SDK to create new pipeline elements
+- Several UI improvements to make the definition of pipelines more intuitive
+- Several bug fixes and code improvements
+
+### Removed
\ No newline at end of file
diff --git a/README.md b/README.md
index d21572ccec..deec42fb6c 100644
--- a/README.md
+++ b/README.md
@@ -20,15 +20,20 @@ Read the full documentation at [https://docs.streampipes.org](https://docs.strea
### Getting started
It's easy to get started:
-* Download the `docker-compose.yml` file from [https://www.github.com/streampipes/preview-docker](https://www.github.com/streampipes/preview-docker)
-* Follow the installation guide at [https://docs.streampipes.org/quick_start/installation](https://docs.streampipes.org/quick_start/installation)
+* Clone the installer script from [https://www.github.com/streampipes/streampipes-installer](https://www.github.com/streampipes/streampipes-installer)
+* Follow the installation guide at [https://docs.streampipes.org/docs/user-guide-installation](https://docs.streampipes.org/docs/user-guide-installation)
* Check the [tour](https://docs.streampipes.org/user_guide/features) and build your first pipeline!
### Extending StreamPipes
You can easily add your own data streams, processors or sinks.
-Check our developer guide at [https://docs.streampipes.org/developer_guide/introduction](https://docs.streampipes.org/developer_guide/introduction)
+Check our developer guide at [https://docs.streampipes.org/docs/dev-guide-introduction](https://docs.streampipes.org/docs/dev-guide-introduction)
+
+### Community
+
+- [Twitter](https://twitter.com/streampipes)
+- [Email](mailto:feedback@streampipes.org)
### Contributing
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/META-INF/archetype.xml b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/META-INF/archetype.xml
deleted file mode 100644
index 1881e447f2..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/META-INF/archetype.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-
-
-
- streampipes-archetype-storm
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/META-INF/maven/archetype-metadata.xml
deleted file mode 100644
index 24f6bac4b3..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-
-
-
-
-
- src/main/java
-
- **/*.java
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/pom.xml
deleted file mode 100644
index 6f954c049c..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/pom.xml
+++ /dev/null
@@ -1,100 +0,0 @@
-
- 4.0.0
-
- ${groupId}
- ${artifactId}
- ${version}
- jar
-
-
- de.fzi.cep.sepa
- semantic-epa-commons
- 0.0.1-SNAPSHOT
-
-
- org.apache.kafka
- kafka_2.10
-
-
-
-
- de.fzi.cep.sepa
- semantic-epa
- 0.0.1-SNAPSHOT
-
-
- com.espertech
- esper
- 5.2.0
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.1
-
-
- 1.8
- UTF-8
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 2.3
-
-
- package
-
- shade
-
-
-
-
- ${package}.main.Init
-
-
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
- META-INF/maven/com.github.jsonld-java/jsonld-java/pom.xml
- META-INF/maven/com.github.jsonld-java/jsonld-java-sesame/pom.xml
-
-
-
-
-
-
-
-
-
-
-
- laus
- nexus repository
- http://laus.fzi.de/nexus/content/repositories/public/
-
- true
- daily
-
-
- true
-
-
-
-
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/__elementName__/__classNamePrefix__.java b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/__elementName__/__classNamePrefix__.java
deleted file mode 100644
index 1113303cae..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/__elementName__/__classNamePrefix__.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package ${package}.${elementName};
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-import ${package}.esper.EsperEventEngine;
-
-
-public class ${classNamePrefix} extends EsperEventEngine<${classNamePrefix}Parameters>{
-
- private static final Logger logger = LoggerFactory.getLogger(${classNamePrefix}.class.getSimpleName());
-
- protected List statements(final ${classNamePrefix}Parameters params) {
-
- List statements = new ArrayList();
-
- return statements;
-
- }
-}
-
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Controller.java
deleted file mode 100644
index fc8f9f58fd..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Controller.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package ${package}.${elementName};
-
-import com.google.common.io.Resources;
-
-import de.fzi.cep.sepa.commons.exceptions.SepaParseException;
-
-import de.fzi.cep.sepa.desc.EpDeclarer;
-
-import de.fzi.cep.sepa.model.impl.Response;
-import de.fzi.cep.sepa.model.impl.graph.SepaDescription;
-import de.fzi.cep.sepa.model.impl.graph.SepaInvocation;
-
-import de.fzi.cep.sepa.model.util.SepaUtils;
-
-import de.fzi.cep.sepa.client.util.DeclarerUtils;
-
-public class ${classNamePrefix}Controller extends EpDeclarer<${classNamePrefix}Parameters> {
-
- @Override
- public SepaDescription declareModel() {
-
- try {
- return DeclarerUtils.descriptionFromResources(Resources.getResource("${elementName}.jsonLd"), SepaDescription.class);
- } catch (SepaParseException e) {
- e.printStackTrace();
- return null;
- }
-
- }
-
- @Override
- public Response invokeRuntime(SepaInvocation sepa) {
-
- ${classNamePrefix}Parameters staticParams = new ${classNamePrefix}Parameters(sepa);
-
- try {
- invokeEPRuntime(staticParams, ${classNamePrefix}::new, sepa);
- return new Response(sepa.getElementId(), true);
- } catch (Exception e) {
- e.printStackTrace();
- return new Response(sepa.getElementId(), false, e.getMessage());
- }
- }
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Parameters.java b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Parameters.java
deleted file mode 100644
index cca8beaee2..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Parameters.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package ${package}.${elementName};
-
-import de.fzi.cep.sepa.model.impl.graph.SepaInvocation;
-import de.fzi.cep.sepa.runtime.param.BindingParameters;
-
-public class ${classNamePrefix}Parameters extends BindingParameters {
-
- public ${classNamePrefix}Parameters(SepaInvocation graph) {
- super(graph);
- }
-
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/AbstractQueueRunnable.java b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/AbstractQueueRunnable.java
deleted file mode 100644
index 4396561dac..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/AbstractQueueRunnable.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package ${package}.esper;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-public abstract class AbstractQueueRunnable extends Thread
-{
- protected BlockingQueue queue;
- protected long closeAfter = 0;
- protected long currentTimestamp;
- protected boolean autoClose;
- private boolean running;
-
- public AbstractQueueRunnable(int maxQueueSize, int closeAfter)
- {
- queue = new ArrayBlockingQueue(maxQueueSize);
- this.autoClose = true;
- this.closeAfter = closeAfter * 1000;
- this.currentTimestamp = System.currentTimeMillis();
- }
-
- public AbstractQueueRunnable(int maxQueueSize)
- {
- queue = new ArrayBlockingQueue(maxQueueSize);
- this.autoClose = false;
- this.currentTimestamp = System.currentTimeMillis();
- }
-
- @Override
- public void run()
- {
- running = true;
- while (running)
- {
- if (autoClose)
- if (System.currentTimeMillis()-currentTimestamp > closeAfter) break;
- try
- {
- T data = queue.take();
- currentTimestamp = System.currentTimeMillis();
- doNext(data);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- if (e instanceof InterruptedException)
- Thread.currentThread().interrupt();
- else
- e.printStackTrace();
- }
- }
- System.out.println("Interrupted");
- }
-
- public void interrupt()
- {
- running = false;
- }
-
- public void add(T data) throws InterruptedException
- {
- queue.put(data);
- }
-
- protected abstract void doNext(T data) throws Exception;
-}
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/EsperEventEngine.java b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/EsperEventEngine.java
deleted file mode 100644
index a3dcc85e4b..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/EsperEventEngine.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package ${package}.esper;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.espertech.esper.client.ConfigurationException;
-import com.espertech.esper.client.EPServiceProvider;
-import com.espertech.esper.client.EPServiceProviderManager;
-import com.espertech.esper.client.EPStatement;
-import com.espertech.esper.client.EventBean;
-import com.espertech.esper.client.UpdateListener;
-
-import de.fzi.cep.sepa.commons.Utils;
-import de.fzi.cep.sepa.model.impl.graph.SepaInvocation;
-import de.fzi.cep.sepa.runtime.EPEngine;
-import de.fzi.cep.sepa.runtime.OutputCollector;
-import de.fzi.cep.sepa.runtime.param.BindingParameters;
-import de.fzi.cep.sepa.runtime.param.EngineParameters;
-
-public abstract class EsperEventEngine implements EPEngine{
-
- protected EPServiceProvider epService;
- protected List epStatements;
-
- private AbstractQueueRunnable queue;
- private List eventTypeNames = new ArrayList<>();
-
- private static final Logger logger = LoggerFactory.getLogger(EsperEventEngine.class.getSimpleName());
-
- @Override
- public void bind(EngineParameters parameters, OutputCollector collector, SepaInvocation graph) {
- if (parameters.getInEventTypes().size() != graph.getInputStreams().size())
- throw new IllegalArgumentException("Input parameters do not match!");
-
- epService = EPServiceProviderManager.getDefaultProvider();
-
- logger.info("Configuring event types for graph " +graph.getName());
- parameters.getInEventTypes().entrySet().forEach(e -> {
- Map inTypeMap = e.getValue();
- checkAndRegisterEventType(e.getKey(), inTypeMap);
- });
-
- checkAndRegisterEventType("topic://" +graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicName(), parameters.getOutEventType());
-
- List statements = statements(parameters.getStaticProperty());
- registerStatements(statements, collector, parameters.getStaticProperty());
-
- }
-
- private void checkAndRegisterEventType(String key, Map typeMap)
- {
- Map newTypeMap = new HashMap();
- Iterator it = typeMap.keySet().iterator();
- while(it.hasNext())
- {
- String objKey = it.next();
- Object obj = typeMap.get(objKey);
- if (obj instanceof java.util.List)
- {
- String eventName = StringUtils.capitalize(objKey);
- registerEventTypeIfNotExists(eventName, (Map) ((java.util.List) obj).get(0));
- newTypeMap.put(objKey, eventName +"[]");
- }
- else {
- newTypeMap.put(objKey, obj);
- }
- }
- //MapUtils.debugPrint(System.out, key, newTypeMap);
- registerEventTypeIfNotExists(key, newTypeMap);
-
- }
-
- private void registerEventTypeIfNotExists(String eventTypeName, Map typeMap)
- {
- try {
- logger.info("Registering event type, " +eventTypeName);
- epService.getEPAdministrator().getConfiguration().addEventType(eventTypeName, typeMap);
- eventTypeNames.add(eventTypeName);
- } catch (ConfigurationException e)
- {
- e.printStackTrace();
- logger.info("Event type does already exist, " +eventTypeName);
- }
- }
-
- private void registerStatements(List statements, OutputCollector collector, T params)
- {
- toEpStatement(statements);
- queue = new StatementAwareQueue(getWriter(collector, params), 50000);
- queue.start();
- for(EPStatement epStatement : epStatements)
- {
- logger.info("Registering statement " +epStatement.getText());
-
- if (epStatement.getText().startsWith("select"))
- {
- epStatement.addListener(listenerSendingTo(queue));
- }
- epStatement.start();
-
- }
-
- }
-
- private void toEpStatement(List statements)
- {
- if (epStatements == null) epStatements = new ArrayList<>();
- for(String statement : statements)
- {
- epStatements.add(epService.getEPAdministrator().createEPL(statement));
- }
- epStatements.add(epService.getEPAdministrator().createEPL("select * from StatusEvent"));
- }
-
- @Override
- public void onEvent(Map event, String sourceInfo) {
- epService.getEPRuntime().sendEvent(event, sourceInfo);
- }
-
- @Override
- public void discard() {
- logger.info("Removing existing statements");
- for(EPStatement epStatement : epStatements)
- {
- epService.getEPAdministrator().getStatement(epStatement.getName()).removeAllListeners();
- epService.getEPAdministrator().getStatement(epStatement.getName()).stop();
- epService.getEPAdministrator().getStatement(epStatement.getName()).destroy();
- }
- epStatements.clear();
- for(String eventName : eventTypeNames)
- {
- try {
- epService.getEPAdministrator().getConfiguration().removeEventType(eventName, false);
- } catch (ConfigurationException ce)
- {
- logger.info("Event type used in another statement which is still running, skipping...");
- }
- }
-
- queue.interrupt();
- }
-
- private static UpdateListener listenerSendingTo(AbstractQueueRunnable queue) {
- return new UpdateListener() {
-
- @Override
- public void update(EventBean[] newEvents, EventBean[] oldEvents) {
- try {
- if (newEvents != null) queue.add(newEvents);
- else queue.add(oldEvents);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- }
-
- protected abstract List statements(final T bindingParameters);
-
- protected String fixEventName(String eventName)
- {
- return "`" +eventName +"`";
- }
-
- protected List makeStatementList(String statement)
- {
- return Utils.createList(statement);
- }
-
- protected Writer getWriter(OutputCollector collector, T params)
- {
- return new SEPAWriter(collector);
- }
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/SEPAWriter.java b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/SEPAWriter.java
deleted file mode 100644
index 79c50bc2b7..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/SEPAWriter.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package ${package}.esper;
-
-import com.espertech.esper.client.EventBean;
-
-import de.fzi.cep.sepa.runtime.OutputCollector;
-
-public class SEPAWriter implements Writer {
-
- private OutputCollector collector;
-
- public SEPAWriter(OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void onEvent(EventBean bean) {
- collector.send(bean.getUnderlying());
- }
-
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/StatementAwareQueue.java b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/StatementAwareQueue.java
deleted file mode 100644
index c2d5378620..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/StatementAwareQueue.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package ${package}.esper;
-
-import com.espertech.esper.client.EventBean;
-
-public class StatementAwareQueue extends AbstractQueueRunnable{
-
- private int counter = 0;
- private Writer writer;
-
- public StatementAwareQueue(Writer writer, int maxQueueSize, int closeAfter) {
- super(maxQueueSize, closeAfter);
- this.writer = writer;
- }
-
- public StatementAwareQueue(Writer writer, int maxQueueSize) {
- super(maxQueueSize);
- this.writer = writer;
- }
-
- @Override
- protected void doNext(EventBean[] newEvents) throws Exception {
- currentTimestamp = System.currentTimeMillis();
- counter++;
- if (counter % 100000 == 0) System.out.println(counter + " Events received.");
- writer.onEvent(newEvents[0]);
- }
-
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/Writer.java b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/Writer.java
deleted file mode 100644
index 681ed064ab..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/esper/Writer.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package ${package}.esper;
-
-import com.espertech.esper.client.EventBean;
-
-public interface Writer {
-
- public void onEvent(EventBean bean);
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/main/Init.java
deleted file mode 100644
index bf0048a7c2..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/src/main/resources/archetype-resources/src/main/java/main/Init.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package ${package}.main;
-
-import java.util.Arrays;
-
-import de.fzi.cep.sepa.desc.ModelSubmitter;
-
-import ${package}.${elementName}.${classNamePrefix}Controller;
-
-public class Init {
-
- public static void main(String[] args)
- {
- try {
- ModelSubmitter.submitAgent(Arrays.asList(new ${classNamePrefix}Controller()));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/MANIFEST.MF b/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/MANIFEST.MF
deleted file mode 100644
index eaca22bf14..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,5 +0,0 @@
-Manifest-Version: 1.0
-Build-Jdk: 1.7.0_101
-Built-By: philipp
-Created-By: Maven Integration for Eclipse
-
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/archetype.xml b/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/archetype.xml
deleted file mode 100644
index 1881e447f2..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/archetype.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-
-
-
- streampipes-archetype-storm
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/maven/archetype-metadata.xml
deleted file mode 100644
index 24f6bac4b3..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/maven/archetype-metadata.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-
-
-
-
-
- src/main/java
-
- **/*.java
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/maven/de.fzi.cep.sepa/streampipes-archetype-esper-standalone/pom.properties b/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/maven/de.fzi.cep.sepa/streampipes-archetype-esper-standalone/pom.properties
deleted file mode 100644
index 77bdefec8a..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/maven/de.fzi.cep.sepa/streampipes-archetype-esper-standalone/pom.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Copyright 2018 FZI Forschungszentrum Informatik
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-#Generated by Maven Integration for Eclipse
-#Thu May 05 14:09:02 CEST 2016
-version=0.0.1-SNAPSHOT
-groupId=de.fzi.cep.sepa
-m2e.projectName=streampipes-archetype-esper-standalone
-m2e.projectLocation=/home/philipp/Coding/fzi/icep/semantic-epa-parent/archetypes/streampipes-archetype-esper-standalone
-artifactId=streampipes-archetype-esper-standalone
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/maven/de.fzi.cep.sepa/streampipes-archetype-esper-standalone/pom.xml b/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/maven/de.fzi.cep.sepa/streampipes-archetype-esper-standalone/pom.xml
deleted file mode 100644
index 8c3ffb21a4..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/META-INF/maven/de.fzi.cep.sepa/streampipes-archetype-esper-standalone/pom.xml
+++ /dev/null
@@ -1,23 +0,0 @@
-
-
-
- 4.0.0
- de.fzi.cep.sepa
- streampipes-archetype-esper-standalone
- 0.0.1-SNAPSHOT
-
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/pom.xml b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/pom.xml
deleted file mode 100644
index f26630054f..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/pom.xml
+++ /dev/null
@@ -1,117 +0,0 @@
-
-
-
- 4.0.0
-
- ${groupId}
- ${artifactId}
- ${version}
- jar
-
-
- de.fzi.cep.sepa
- semantic-epa-commons
- 0.0.1-SNAPSHOT
-
-
- org.apache.kafka
- kafka_2.10
-
-
-
-
- de.fzi.cep.sepa
- semantic-epa
- 0.0.1-SNAPSHOT
-
-
- com.espertech
- esper
- 5.2.0
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.1
-
-
- 1.8
- UTF-8
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 2.3
-
-
- package
-
- shade
-
-
-
-
- ${package}.main.Init
-
-
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
- META-INF/maven/com.github.jsonld-java/jsonld-java/pom.xml
- META-INF/maven/com.github.jsonld-java/jsonld-java-sesame/pom.xml
-
-
-
-
-
-
-
-
-
-
-
- laus
- nexus repository
- http://laus.fzi.de/nexus/content/repositories/public/
-
- true
- daily
-
-
- true
-
-
-
-
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/__elementName__/__classNamePrefix__.java b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/__elementName__/__classNamePrefix__.java
deleted file mode 100644
index 1113303cae..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/__elementName__/__classNamePrefix__.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package ${package}.${elementName};
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-import ${package}.esper.EsperEventEngine;
-
-
-public class ${classNamePrefix} extends EsperEventEngine<${classNamePrefix}Parameters>{
-
- private static final Logger logger = LoggerFactory.getLogger(${classNamePrefix}.class.getSimpleName());
-
- protected List statements(final ${classNamePrefix}Parameters params) {
-
- List statements = new ArrayList();
-
- return statements;
-
- }
-}
-
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Controller.java
deleted file mode 100644
index fc8f9f58fd..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Controller.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package ${package}.${elementName};
-
-import com.google.common.io.Resources;
-
-import de.fzi.cep.sepa.commons.exceptions.SepaParseException;
-
-import de.fzi.cep.sepa.desc.EpDeclarer;
-
-import de.fzi.cep.sepa.model.impl.Response;
-import de.fzi.cep.sepa.model.impl.graph.SepaDescription;
-import de.fzi.cep.sepa.model.impl.graph.SepaInvocation;
-
-import de.fzi.cep.sepa.model.util.SepaUtils;
-
-import de.fzi.cep.sepa.client.util.DeclarerUtils;
-
-public class ${classNamePrefix}Controller extends EpDeclarer<${classNamePrefix}Parameters> {
-
- @Override
- public SepaDescription declareModel() {
-
- try {
- return DeclarerUtils.descriptionFromResources(Resources.getResource("${elementName}.jsonLd"), SepaDescription.class);
- } catch (SepaParseException e) {
- e.printStackTrace();
- return null;
- }
-
- }
-
- @Override
- public Response invokeRuntime(SepaInvocation sepa) {
-
- ${classNamePrefix}Parameters staticParams = new ${classNamePrefix}Parameters(sepa);
-
- try {
- invokeEPRuntime(staticParams, ${classNamePrefix}::new, sepa);
- return new Response(sepa.getElementId(), true);
- } catch (Exception e) {
- e.printStackTrace();
- return new Response(sepa.getElementId(), false, e.getMessage());
- }
- }
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Parameters.java b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Parameters.java
deleted file mode 100644
index cca8beaee2..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/__elementName__/__classNamePrefix__Parameters.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package ${package}.${elementName};
-
-import de.fzi.cep.sepa.model.impl.graph.SepaInvocation;
-import de.fzi.cep.sepa.runtime.param.BindingParameters;
-
-public class ${classNamePrefix}Parameters extends BindingParameters {
-
- public ${classNamePrefix}Parameters(SepaInvocation graph) {
- super(graph);
- }
-
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/AbstractQueueRunnable.java b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/AbstractQueueRunnable.java
deleted file mode 100644
index 4396561dac..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/AbstractQueueRunnable.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package ${package}.esper;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-public abstract class AbstractQueueRunnable extends Thread
-{
- protected BlockingQueue queue;
- protected long closeAfter = 0;
- protected long currentTimestamp;
- protected boolean autoClose;
- private boolean running;
-
- public AbstractQueueRunnable(int maxQueueSize, int closeAfter)
- {
- queue = new ArrayBlockingQueue(maxQueueSize);
- this.autoClose = true;
- this.closeAfter = closeAfter * 1000;
- this.currentTimestamp = System.currentTimeMillis();
- }
-
- public AbstractQueueRunnable(int maxQueueSize)
- {
- queue = new ArrayBlockingQueue(maxQueueSize);
- this.autoClose = false;
- this.currentTimestamp = System.currentTimeMillis();
- }
-
- @Override
- public void run()
- {
- running = true;
- while (running)
- {
- if (autoClose)
- if (System.currentTimeMillis()-currentTimestamp > closeAfter) break;
- try
- {
- T data = queue.take();
- currentTimestamp = System.currentTimeMillis();
- doNext(data);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- if (e instanceof InterruptedException)
- Thread.currentThread().interrupt();
- else
- e.printStackTrace();
- }
- }
- System.out.println("Interrupted");
- }
-
- public void interrupt()
- {
- running = false;
- }
-
- public void add(T data) throws InterruptedException
- {
- queue.put(data);
- }
-
- protected abstract void doNext(T data) throws Exception;
-}
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/EsperEventEngine.java b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/EsperEventEngine.java
deleted file mode 100644
index a3dcc85e4b..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/EsperEventEngine.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package ${package}.esper;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.espertech.esper.client.ConfigurationException;
-import com.espertech.esper.client.EPServiceProvider;
-import com.espertech.esper.client.EPServiceProviderManager;
-import com.espertech.esper.client.EPStatement;
-import com.espertech.esper.client.EventBean;
-import com.espertech.esper.client.UpdateListener;
-
-import de.fzi.cep.sepa.commons.Utils;
-import de.fzi.cep.sepa.model.impl.graph.SepaInvocation;
-import de.fzi.cep.sepa.runtime.EPEngine;
-import de.fzi.cep.sepa.runtime.OutputCollector;
-import de.fzi.cep.sepa.runtime.param.BindingParameters;
-import de.fzi.cep.sepa.runtime.param.EngineParameters;
-
-public abstract class EsperEventEngine implements EPEngine{
-
- protected EPServiceProvider epService;
- protected List epStatements;
-
- private AbstractQueueRunnable queue;
- private List eventTypeNames = new ArrayList<>();
-
- private static final Logger logger = LoggerFactory.getLogger(EsperEventEngine.class.getSimpleName());
-
- @Override
- public void bind(EngineParameters parameters, OutputCollector collector, SepaInvocation graph) {
- if (parameters.getInEventTypes().size() != graph.getInputStreams().size())
- throw new IllegalArgumentException("Input parameters do not match!");
-
- epService = EPServiceProviderManager.getDefaultProvider();
-
- logger.info("Configuring event types for graph " +graph.getName());
- parameters.getInEventTypes().entrySet().forEach(e -> {
- Map inTypeMap = e.getValue();
- checkAndRegisterEventType(e.getKey(), inTypeMap);
- });
-
- checkAndRegisterEventType("topic://" +graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicName(), parameters.getOutEventType());
-
- List statements = statements(parameters.getStaticProperty());
- registerStatements(statements, collector, parameters.getStaticProperty());
-
- }
-
- private void checkAndRegisterEventType(String key, Map typeMap)
- {
- Map newTypeMap = new HashMap();
- Iterator it = typeMap.keySet().iterator();
- while(it.hasNext())
- {
- String objKey = it.next();
- Object obj = typeMap.get(objKey);
- if (obj instanceof java.util.List)
- {
- String eventName = StringUtils.capitalize(objKey);
- registerEventTypeIfNotExists(eventName, (Map) ((java.util.List) obj).get(0));
- newTypeMap.put(objKey, eventName +"[]");
- }
- else {
- newTypeMap.put(objKey, obj);
- }
- }
- //MapUtils.debugPrint(System.out, key, newTypeMap);
- registerEventTypeIfNotExists(key, newTypeMap);
-
- }
-
- private void registerEventTypeIfNotExists(String eventTypeName, Map typeMap)
- {
- try {
- logger.info("Registering event type, " +eventTypeName);
- epService.getEPAdministrator().getConfiguration().addEventType(eventTypeName, typeMap);
- eventTypeNames.add(eventTypeName);
- } catch (ConfigurationException e)
- {
- e.printStackTrace();
- logger.info("Event type does already exist, " +eventTypeName);
- }
- }
-
- private void registerStatements(List statements, OutputCollector collector, T params)
- {
- toEpStatement(statements);
- queue = new StatementAwareQueue(getWriter(collector, params), 50000);
- queue.start();
- for(EPStatement epStatement : epStatements)
- {
- logger.info("Registering statement " +epStatement.getText());
-
- if (epStatement.getText().startsWith("select"))
- {
- epStatement.addListener(listenerSendingTo(queue));
- }
- epStatement.start();
-
- }
-
- }
-
- private void toEpStatement(List statements)
- {
- if (epStatements == null) epStatements = new ArrayList<>();
- for(String statement : statements)
- {
- epStatements.add(epService.getEPAdministrator().createEPL(statement));
- }
- epStatements.add(epService.getEPAdministrator().createEPL("select * from StatusEvent"));
- }
-
- @Override
- public void onEvent(Map event, String sourceInfo) {
- epService.getEPRuntime().sendEvent(event, sourceInfo);
- }
-
- @Override
- public void discard() {
- logger.info("Removing existing statements");
- for(EPStatement epStatement : epStatements)
- {
- epService.getEPAdministrator().getStatement(epStatement.getName()).removeAllListeners();
- epService.getEPAdministrator().getStatement(epStatement.getName()).stop();
- epService.getEPAdministrator().getStatement(epStatement.getName()).destroy();
- }
- epStatements.clear();
- for(String eventName : eventTypeNames)
- {
- try {
- epService.getEPAdministrator().getConfiguration().removeEventType(eventName, false);
- } catch (ConfigurationException ce)
- {
- logger.info("Event type used in another statement which is still running, skipping...");
- }
- }
-
- queue.interrupt();
- }
-
- private static UpdateListener listenerSendingTo(AbstractQueueRunnable queue) {
- return new UpdateListener() {
-
- @Override
- public void update(EventBean[] newEvents, EventBean[] oldEvents) {
- try {
- if (newEvents != null) queue.add(newEvents);
- else queue.add(oldEvents);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- }
-
- protected abstract List statements(final T bindingParameters);
-
- protected String fixEventName(String eventName)
- {
- return "`" +eventName +"`";
- }
-
- protected List makeStatementList(String statement)
- {
- return Utils.createList(statement);
- }
-
- protected Writer getWriter(OutputCollector collector, T params)
- {
- return new SEPAWriter(collector);
- }
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/SEPAWriter.java b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/SEPAWriter.java
deleted file mode 100644
index 79c50bc2b7..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/SEPAWriter.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package ${package}.esper;
-
-import com.espertech.esper.client.EventBean;
-
-import de.fzi.cep.sepa.runtime.OutputCollector;
-
-public class SEPAWriter implements Writer {
-
- private OutputCollector collector;
-
- public SEPAWriter(OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void onEvent(EventBean bean) {
- collector.send(bean.getUnderlying());
- }
-
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/StatementAwareQueue.java b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/StatementAwareQueue.java
deleted file mode 100644
index c2d5378620..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/StatementAwareQueue.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package ${package}.esper;
-
-import com.espertech.esper.client.EventBean;
-
-public class StatementAwareQueue extends AbstractQueueRunnable{
-
- private int counter = 0;
- private Writer writer;
-
- public StatementAwareQueue(Writer writer, int maxQueueSize, int closeAfter) {
- super(maxQueueSize, closeAfter);
- this.writer = writer;
- }
-
- public StatementAwareQueue(Writer writer, int maxQueueSize) {
- super(maxQueueSize);
- this.writer = writer;
- }
-
- @Override
- protected void doNext(EventBean[] newEvents) throws Exception {
- currentTimestamp = System.currentTimeMillis();
- counter++;
- if (counter % 100000 == 0) System.out.println(counter + " Events received.");
- writer.onEvent(newEvents[0]);
- }
-
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/Writer.java b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/Writer.java
deleted file mode 100644
index 681ed064ab..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/esper/Writer.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package ${package}.esper;
-
-import com.espertech.esper.client.EventBean;
-
-public interface Writer {
-
- public void onEvent(EventBean bean);
-}
diff --git a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/main/Init.java
deleted file mode 100644
index bf0048a7c2..0000000000
--- a/archetypes/streampipes-archetype-esper-standalone/target/classes/archetype-resources/src/main/java/main/Init.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package ${package}.main;
-
-import java.util.Arrays;
-
-import de.fzi.cep.sepa.desc.ModelSubmitter;
-
-import ${package}.${elementName}.${classNamePrefix}Controller;
-
-public class Init {
-
- public static void main(String[] args)
- {
- try {
- ModelSubmitter.submitAgent(Arrays.asList(new ${classNamePrefix}Controller()));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/archetypes/streampipes-archetype-pe-jvm/src/main/resources/archetype-resources/Dockerfile b/archetypes/streampipes-archetype-pe-jvm/src/main/resources/archetype-resources/Dockerfile
deleted file mode 100644
index f024d3ba43..0000000000
--- a/archetypes/streampipes-archetype-pe-jvm/src/main/resources/archetype-resources/Dockerfile
+++ /dev/null
@@ -1,11 +0,0 @@
-FROM anapsix/alpine-java
-
-ENV CONSUL_LOCATION consul
-
-EXPOSE 8090
-
-RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf
-
-ADD target/${artifactId}.jar /${artifactId}.jar
-
-ENTRYPOINT ["java", "-jar", "/${artifactId}.jar"]
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/README.md b/archetypes/streampipes-archetype-pe-processors-flink/README.md
new file mode 100644
index 0000000000..0f2f5b3689
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-processors-flink/README.md
@@ -0,0 +1,18 @@
+## StreamPipes Maven Archetype for Flink-based Data Processors
+
+### Usage
+
+mvn archetype:generate \
+ -DarchetypeGroupId=org.streampipes \
+ -DarchetypeArtifactId=streampipes-archetype-pe-processors-flink \
+ -DarchetypeVersion=0.55.3-SNAPSHOT \
+ -DgroupId=my.test.groupId \
+ -DartifactId=my-test-artifact-id
+ -DclassNamePrefix=MyProcessor
+ -DpackageName=mypackagename
+
+### Variables
+
+* classNamePrefix: Will be used as a prefix to name your controller & parameter classes
+* packageName: Will be used as the package name
+
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/pom.xml b/archetypes/streampipes-archetype-pe-processors-flink/pom.xml
new file mode 100644
index 0000000000..8791c5a179
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-processors-flink/pom.xml
@@ -0,0 +1,59 @@
+
+
+ 4.0.0
+
+ org.streampipes
+ streampipes-parent
+ 0.60.0
+ ../../pom.xml
+
+ streampipes-archetype-pe-processors-flink
+ maven-archetype
+
+ streampipes-archetype-pe-processors-flink
+
+
+
+
+ org.apache.maven.archetype
+ archetype-packaging
+ 3.0.1
+
+
+
+
+
+
+ maven-archetype-plugin
+ 3.0.1
+
+
+
+
+
+
+
+ scm:git:ssh://git@ipe-wim-gitlab.fzi.de:2222/streampipes/ce-backend.git/archetypes/streampipes-archetype-pe-sinks-jvm
+
+
+
+
+
+ deployment
+ Internal Releases
+ https://laus.fzi.de/nexus/content/repositories/public/
+
+
+
+
+ deployment
+ Internal Releases
+ https://laus.fzi.de/nexus/content/repositories/releases/
+
+
+ deployment
+ Internal Releases
+ https://laus.fzi.de/nexus/content/repositories/snapshots/
+
+
+
diff --git a/archetypes/streampipes-archetype-pe-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml
similarity index 96%
rename from archetypes/streampipes-archetype-pe-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml
rename to archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml
index f562ab08d5..c309cbf6b0 100644
--- a/archetypes/streampipes-archetype-pe-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -28,7 +28,7 @@
Example
-
+
example
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/Dockerfile b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/Dockerfile
new file mode 100644
index 0000000000..06f89bbaae
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/Dockerfile
@@ -0,0 +1,8 @@
+FROM anapsix/alpine-java:8
+
+EXPOSE 8090
+ENV CONSUL_LOCATION consul
+
+COPY ./target/${artifactId}.jar /streampipes-processing-element-container.jar
+
+ENTRYPOINT ["java", "-jar", "/streampipes-processing-element-container.jar"]
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000000..5a80f20dd8
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,154 @@
+
+
+ 4.0.0
+ ${groupId}
+ ${artifactId}
+ ${version}
+
+
+ 0.55.3-SNAPSHOT
+
+
+
+
+ org.streampipes
+ streampipes-container-standalone
+ ${sp.version}
+
+
+ org.apache.kafka
+ kafka_2.10
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+
+
+ org.streampipes
+ streampipes-commons
+ ${sp.version}
+
+
+ org.apache.kafka
+ kafka_2.10
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+
+
+ org.streampipes
+ streampipes-wrapper-flink
+ ${sp.version}
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+ commons-beanutils
+ commons-beanutils
+ 1.9.2
+
+
+ org.streampipes
+ streampipes-sdk
+ ${sp.version}
+
+
+ org.apache.logging.log4j
+ log4j-to-slf4j
+ 2.8.2
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.24
+
+
+ org.streampipes
+ streampipes-config
+ ${sp.version}
+
+
+
+
+
+ laus
+ nexus repository
+ http://laus.fzi.de/nexus/content/repositories/public/
+
+ true
+ daily
+
+
+ true
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.3
+
+
+ package
+
+ shade
+
+
+ false
+
+
+ ${package}.main.Init
+
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ META-INF/maven/com.github.jsonld-java/jsonld-java/pom.xml
+ META-INF/maven/com.github.jsonld-java/jsonld-java-sesame/pom.xml
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+
+ 1.8
+ UTF-8
+
+
+
+ ${artifactId}
+
+
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
new file mode 100644
index 0000000000..e742629bca
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
@@ -0,0 +1,84 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}.config;
+
+import org.streampipes.config.SpConfig;
+import org.streampipes.container.model.PeConfig;
+
+public enum Config implements PeConfig {
+
+ INSTANCE;
+
+ private SpConfig config;
+ public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
+
+ private final static String service_id = "pe/${package}";
+ private final static String service_name = "${packageName}";
+ private final static String service_container_name = "${artifactId}";
+
+
+ Config() {
+ config = SpConfig.getSpConfig(service_id);
+
+ config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component");
+ config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
+ config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
+ config.register(ConfigKeys.FLINK_PORT, 6123, "Port for the flink cluster");
+ config.register(ConfigKeys.ICON_HOST, "backend", "Hostname for the icon host");
+ config.register(ConfigKeys.ICON_PORT, 80, "Port for the icons in nginx");
+
+ config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
+
+ config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
+
+ }
+
+ @Override
+ public String getHost() {
+ return config.getString(ConfigKeys.HOST);
+ }
+
+ @Override
+ public int getPort() {
+ return config.getInteger(ConfigKeys.PORT);
+ }
+
+ public String getFlinkHost() {
+ return config.getString(ConfigKeys.FLINK_HOST);
+ }
+
+ public int getFlinkPort() {
+ return config.getInteger(ConfigKeys.FLINK_PORT);
+ }
+
+ public static final String iconBaseUrl = "http://" + Config.INSTANCE.getIconHost() + ":" +
+ Config.INSTANCE.getIconPort() + "/assets/img/pe_icons";
+
+ public static final String getIconUrl(String pictureName) {
+ return iconBaseUrl + "/" + pictureName + ".png";
+ }
+
+ public String getIconHost() {
+ return config.getString(ConfigKeys.ICON_HOST);
+ }
+
+ public int getIconPort() {
+ return config.getInteger(ConfigKeys.ICON_PORT);
+ }
+
+ public boolean getDebug() {
+ return config.getBoolean(ConfigKeys.DEBUG);
+ }
+
+ @Override
+ public String getId() {
+ return service_id;
+ }
+
+ @Override
+ public String getName() {
+ return config.getString(ConfigKeys.SERVICE_NAME);
+ }
+
+}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
new file mode 100644
index 0000000000..4110b02b99
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -0,0 +1,15 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}.config;
+
+public class ConfigKeys {
+ final static String HOST = "SP_HOST";
+ final static String PORT = "SP_PORT";
+ final static String FLINK_HOST = "SP_FLINK_HOST";
+ final static String FLINK_PORT = "SP_FLINK_PORT";
+ final static String ICON_HOST = "SP_ICON_HOST";
+ final static String ICON_PORT = "SP_ICON_PORT";
+ final static String SERVICE_NAME = "SP_SERVICE_NAME";
+ final static String DEBUG = "SP_FLINK_DEBUG";
+}
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java
new file mode 100644
index 0000000000..628bce1d70
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java
@@ -0,0 +1,23 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}.main;
+
+import org.streampipes.container.init.DeclarersSingleton;
+import org.streampipes.container.standalone.init.StandaloneModelSubmitter;
+
+import ${package}.config.Config;
+import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller;
+
+public class Init extends StandaloneModelSubmitter {
+
+ public static void main(String[] args) throws Exception {
+ DeclarersSingleton.getInstance()
+ .add(new ${classNamePrefix}Controller());
+
+ new Init().init(Config.INSTANCE);
+
+ }
+
+
+}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java
new file mode 100644
index 0000000000..013c3d49b2
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java
@@ -0,0 +1,21 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+
+package ${package}.pe.processor.${packageName};
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Map;
+
+public class ${classNamePrefix} implements FlatMapFunction