diff --git a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml
index dd9ec81cb8..b71b32cfbe 100644
--- a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml
@@ -44,6 +44,11 @@
streampipes-sources
${sp.version}
+
+ org.slf4j
+ slf4j-api
+ 2.0.6
+
org.junit.jupiter
junit-jupiter-engine
@@ -62,7 +67,7 @@
org.springframework.boot
spring-boot-maven-plugin
- 2.6.2
+ 3.0.1
@@ -77,10 +82,9 @@
org.apache.maven.plugins
maven-compiler-plugin
- 3.1
+ 3.10.1
- 1.8
- 1.8
+ 17
UTF-8
diff --git a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
index 4863966d15..777e6dc314 100644
--- a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
+++ b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -21,16 +21,16 @@
#set( $symbol_escape = '\' )
package ${package};
-import org.apache.streampipes.container.extensions.ExtensionsModelSubmitter;
-import org.apache.streampipes.container.model.SpServiceDefinition;
-import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
+import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import ${package}.pe.${packageName}.${classNamePrefix}DataProcessor;
import ${package}.pe.${packageName}.${classNamePrefix}DataSink;
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
index 254fad6efe..664c54d09d 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
@@ -31,22 +31,7 @@
org.apache.streampipes
- streampipes-container-standalone
- ${sp.version}
-
-
- org.apache.kafka
- kafka_2.10
-
-
- org.apache.kafka
- kafka-clients
-
-
-
-
- org.apache.streampipes
- streampipes-commons
+ streampipes-service-extensions
${sp.version}
@@ -92,48 +77,13 @@
org.slf4j
slf4j-api
- 1.7.24
+ 2.0.6
org.apache.streampipes
streampipes-config
${sp.version}
-
- org.apache.streampipes
- streampipes-dataformat-json
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-dataformat-cbor
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-dataformat-smile
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-dataformat-fst
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-messaging-jms
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-messaging-kafka
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-messaging-mqtt
- ${sp.version}
-
@@ -146,7 +96,7 @@
org.springframework.boot
spring-boot-maven-plugin
- 2.4.1
+ 3.0.1
@@ -202,10 +152,9 @@
org.apache.maven.plugins
maven-compiler-plugin
- 3.1
+ 3.10.1
- 1.8
- 1.8
+ 17
UTF-8
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
index 8b05fdfe31..dc52194fbe 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -21,11 +21,6 @@
package ${package};
-import org.apache.streampipes.container.init.DeclarersSingleton;
-import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
-import org.apache.streampipes.container.model.SpServiceDefinition;
-import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
-
import ${package}.config.ConfigKeys;
import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller;
@@ -34,11 +29,14 @@
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
+import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
-public class Init extends StandaloneModelSubmitter {
+public class Init extends ExtensionsModelSubmitter {
public static void main(String[] args) throws Exception {
new Init().init();
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
index 981a6a8c92..dc810a0ce7 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
@@ -21,10 +21,10 @@
package ${package}.pe.processor.${packageName};
+import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.client.StreamPipesClient;
-import org.apache.streampipes.container.config.ConfigExtractor;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
index 929e3a87a2..fcad195dc9 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
@@ -26,7 +26,7 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.streampipes.client.StreamPipesClient;
-import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
index 63155f6991..e3ef7c0bf6 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
@@ -31,22 +31,7 @@
org.apache.streampipes
- streampipes-container-standalone
- ${sp.version}
-
-
- org.apache.kafka
- kafka_2.10
-
-
- org.apache.kafka
- kafka-clients
-
-
-
-
- org.apache.streampipes
- streampipes-commons
+ streampipes-service-extensions
${sp.version}
@@ -81,7 +66,7 @@
org.apache.streampipes
- streampipes-sdk
+ streampipes-sdk-bundle
${sp.version}
@@ -92,48 +77,13 @@
org.slf4j
slf4j-api
- 1.7.24
+ 2.0.6
org.apache.streampipes
streampipes-config
${sp.version}
-
- org.apache.streampipes
- streampipes-dataformat-json
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-dataformat-cbor
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-dataformat-smile
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-dataformat-fst
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-messaging-jms
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-messaging-kafka
- ${sp.version}
-
-
- org.apache.streampipes
- streampipes-messaging-mqtt
- ${sp.version}
-
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
index 39235cb3d7..ef8c40914a 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -21,36 +21,48 @@
#set( $symbol_escape = '\' )
package ${package};
-import org.apache.streampipes.container.init.DeclarersSingleton;
-import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
-import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
-import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
+import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
+import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
+import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
-import ${package}.config.Config;
+import ${package}.config.ConfigKeys;
import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller;
-public class Init extends StandaloneModelSubmitter {
+public class Init extends ExtensionsModelSubmitter {
- public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- .add(new ${classNamePrefix}Controller());
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
+ public static void main(String[] args) throws Exception {
+ new Init().init();
+ }
- DeclarersSingleton.getInstance().registerProtocols(
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("${package}",
+ "Apache Flink sink",
+ "",
+ 8090)
+ .registerPipelineElement(new ${classNamePrefix}Controller())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
-
- new Init().init(Config.INSTANCE);
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+ .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+ .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+ .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
+ .addConfig(ConfigKeys.SERVICE_NAME, "sp fft stream analytics metrics", "Data processor service name")
+ .addConfig(ConfigKeys.HOST, "${artifactId}", "Data processor host")
+ .build();
}
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
deleted file mode 100644
index 16e02a4464..0000000000
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#set( $symbol_pound = '#' )
-#set( $symbol_dollar = '$' )
-#set( $symbol_escape = '\' )
-#set( $svc_name = $package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId", "-", " ") )
-package ${package}.config;
-
-import org.apache.streampipes.config.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum Config implements PeConfig {
- INSTANCE;
-
- private SpConfig config;
- public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
- private final static String SERVICE_ID = "pe/${package}.sink.flink";
-
- Config() {
- config = SpConfig.getSpConfig(SERVICE_ID);
- config.register(ConfigKeys.HOST, "${artifactId}", "Data sink host");
- config.register(ConfigKeys.PORT, 8090, "Data sink port");
- config.register(ConfigKeys.SERVICE_NAME, "${svc_name}", "Data sink service name");
- config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager host");
- config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port");
- config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
- }
-
- public String getFlinkHost() {
- return config.getString(ConfigKeys.FLINK_HOST);
- }
-
- public int getFlinkPort() {
- return config.getInteger(ConfigKeys.FLINK_PORT);
- }
-
- public boolean getFlinkDebug() {
- return config.getBoolean(ConfigKeys.FLINK_DEBUG);
- }
-
- @Override
- public String getHost() {
- return config.getString(ConfigKeys.HOST);
- }
-
- @Override
- public int getPort() {
- return config.getInteger(ConfigKeys.PORT);
- }
-
- @Override
- public String getId() {
- return SERVICE_ID;
- }
-
- @Override
- public String getName() {
- return config.getString(ConfigKeys.SERVICE_NAME);
- }
-
-}
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
index e307f0da9c..a828f48101 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -22,10 +22,11 @@
package ${package}.config;
public class ConfigKeys {
- final static String HOST = "SP_HOST";
- final static String PORT = "SP_PORT";
- final static String SERVICE_NAME = "SP_SERVICE_NAME";
- final static String FLINK_HOST = "SP_FLINK_HOST";
- final static String FLINK_PORT = "SP_FLINK_PORT";
- final static String FLINK_DEBUG = "SP_FLINK_DEBUG";
-}
\ No newline at end of file
+ public final static String HOST = "SP_HOST";
+ public final static String PORT = "SP_PORT";
+ public final static String SERVICE_NAME = "SP_SERVICE_NAME";
+ public final static String FLINK_HOST = "SP_FLINK_HOST";
+ public final static String FLINK_PORT = "SP_FLINK_PORT";
+ public static final String DEBUG = "SP_FLINK_DEBUG";
+ public static final String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
+}
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
index b0bf3ee6be..eaae81d257 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
@@ -21,6 +21,8 @@
package ${package}.pe.sink.${packageName};
import ${package}.config.Config;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -29,8 +31,6 @@
import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.SupportedFormats;
-import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.wrapper.flink.FlinkDataSinkDeclarer;
import org.apache.streampipes.wrapper.flink.FlinkDataSinkRuntime;
import org.apache.streampipes.sdk.helpers.*;
@@ -59,13 +59,16 @@ public DataSinkDescription declareModel() {
}
@Override
- public FlinkDataSinkRuntime<${classNamePrefix}Parameters> getRuntime(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
+ public FlinkDataSinkRuntime<${classNamePrefix}Parameters> getRuntime(DataSinkInvocation graph,
+ DataSinkParameterExtractor extractor,
+ ConfigExtractor configExtractor,
+ StreamPipesClient streamPipesClient) {
String host = extractor.singleValueParameter(HOST_KEY, String.class);
int port = extractor.singleValueParameter(PORT_KEY, Integer.class);
String password = extractor.secretValue(PASSWORD_KEY);
${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, host, port, password);
- return new ${classNamePrefix}Program(params, Config.INSTANCE.getFlinkDebug());
+ return new ${classNamePrefix}Program(params, configExtractor, streamPipesClient);
}
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
index 3fedc44d78..621a810b61 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
@@ -21,13 +21,17 @@
#set( $symbol_escape = '\' )
package ${package}.pe.sink.${packageName};
-import ${package}.config.Config;
+import ${package}.config.ConfigKeys;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.wrapper.flink.FlinkDataSinkRuntime;
import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
import java.io.Serializable;
public class ${classNamePrefix}Program extends FlinkDataSinkRuntime<${classNamePrefix}Parameters>
@@ -36,15 +40,20 @@ public class ${classNamePrefix}Program extends FlinkDataSinkRuntime<${classNameP
private static final long serialVersionUID = 1L;
private final ${classNamePrefix}Parameters params;
- public ${classNamePrefix}Program(${classNamePrefix}Parameters params, boolean debug) {
- super(params, debug);
+ public ${classNamePrefix}Program(${classNamePrefix}Parameters params,
+ ConfigExtractor configExtractor,
+ StreamPipesClient streamPipesClient) {
+ super(params, configExtractor, streamPipesClient);
this.params = params;
}
@Override
- protected FlinkDeploymentConfig getDeploymentConfig() {
- return new FlinkDeploymentConfig(Config.JAR_FILE,
- Config.INSTANCE.getFlinkHost(), Config.INSTANCE.getFlinkPort());
+ protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+ SpConfig config = configExtractor.getConfig();
+ return new FlinkDeploymentConfig(config.getString(ConfigKeys.FLINK_JAR_FILE_LOC),
+ config.getString(ConfigKeys.FLINK_HOST),
+ config.getInteger(ConfigKeys.FLINK_PORT),
+ config.getBoolean(ConfigKeys.DEBUG));
}
@Override
diff --git a/streampipes-service-base/pom.xml b/streampipes-service-base/pom.xml
index 1dbba04e6e..5f030f24fa 100644
--- a/streampipes-service-base/pom.xml
+++ b/streampipes-service-base/pom.xml
@@ -105,6 +105,10 @@
org.osgi.core
provided
+
+ org.slf4j
+ slf4j-api
+
org.springframework
spring-aop