diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index abab204fff..07a98cce44 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -29,7 +29,6 @@
import org.apache.streampipes.rest.impl.datalake.DataLakeDashboardResource;
import org.apache.streampipes.rest.impl.datalake.DataLakeWidgetResource;
import org.apache.streampipes.rest.impl.datalake.PersistedDataStreamResource;
-import org.apache.streampipes.rest.impl.nouser.PipelineElementImportNoUser;
import org.apache.streampipes.rest.impl.pe.DataProcessorResource;
import org.apache.streampipes.rest.impl.pe.DataSinkResource;
import org.apache.streampipes.rest.impl.pe.DataStreamResource;
@@ -85,7 +84,6 @@ public StreamPipesResourceConfig() {
register(PipelineElementAsset.class);
register(PipelineElementCategory.class);
register(PipelineElementFile.class);
- register(PipelineElementImportNoUser.class);
register(PipelineElementImport.class);
register(PipelineElementPreview.class);
register(PipelineElementRuntimeInfo.class);
diff --git a/streampipes-messaging-jms/pom.xml b/streampipes-messaging-jms/pom.xml
index 3edcc5c3db..2f6828361a 100644
--- a/streampipes-messaging-jms/pom.xml
+++ b/streampipes-messaging-jms/pom.xml
@@ -42,4 +42,26 @@
-
\ No newline at end of file
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+ true
+ true
+
+
+
+
+
+
diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConnectionProvider.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConnectionProvider.java
index 835dd7df6a..4de17d1678 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConnectionProvider.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConnectionProvider.java
@@ -25,16 +25,16 @@
public abstract class ActiveMQConnectionProvider {
- protected Connection startJmsConnection(String url) {
- try {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
- connectionFactory.setAlwaysSyncSend(false);
- Connection connect = connectionFactory.createConnection();
+ protected Connection startJmsConnection(String url) {
+ try {
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+ connectionFactory.setAlwaysSyncSend(false);
+ Connection connect = connectionFactory.createConnection();
- connect.start();
- return connect;
- } catch (JMSException e) {
- throw new AssertionError("Failed to establish the JMS-Connection!", e);
- }
+ connect.start();
+ return connect;
+ } catch (JMSException e) {
+ throw new AssertionError("Failed to establish the JMS-Connection!", e);
}
+ }
}
diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
index 4e85243295..f043f785b4 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
@@ -18,22 +18,24 @@
package org.apache.streampipes.messaging.jms;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.util.ByteSequence;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.util.ByteSequence;
+
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
+
import java.io.Serializable;
public class ActiveMQConsumer extends ActiveMQConnectionProvider implements
- EventConsumer,
- AutoCloseable, Serializable {
+ EventConsumer,
+ AutoCloseable, Serializable {
private Session session;
private MessageConsumer consumer;
@@ -57,13 +59,15 @@ private void initListener() {
@Override
public void connect(JmsTransportProtocol protocolSettings, InternalEventProcessor
- eventProcessor) throws SpRuntimeException {
+ eventProcessor) throws SpRuntimeException {
String url = ActiveMQUtils.makeActiveMqUrl(protocolSettings);
try {
this.eventProcessor = eventProcessor;
session = startJmsConnection(url).createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createConsumer(session.createTopic(protocolSettings.getTopicDefinition().getActualTopicName()));
+ consumer = session.createConsumer(session.createTopic(
+ protocolSettings.getTopicDefinition().getActualTopicName())
+ );
initListener();
this.connected = true;
} catch (JMSException e) {
diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
index 40a11640d3..374f504b24 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
@@ -18,14 +18,15 @@
package org.apache.streampipes.messaging.jms;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -96,16 +97,16 @@ public void connect(JmsTransportProtocol protocolSettings) throws SpRuntimeExcep
try {
this.session = connection
- .createSession(false, Session.AUTO_ACKNOWLEDGE);
+ .createSession(false, Session.AUTO_ACKNOWLEDGE);
this.producer = session.createProducer(session.createTopic(protocolSettings
- .getTopicDefinition()
- .getActualTopicName()));
+ .getTopicDefinition()
+ .getActualTopicName()));
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
this.connection.start();
this.connected = true;
} catch (JMSException e) {
- throw new SpRuntimeException("could not connect to activemq broker. Broker: '" +
- protocolSettings.getBrokerHostname() + "' Port: " + protocolSettings.getPort());
+ throw new SpRuntimeException("could not connect to activemq broker. Broker: '"
+ + protocolSettings.getBrokerHostname() + "' Port: " + protocolSettings.getPort());
}
}
diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java
index 0a66af0786..5bda0e642f 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.messaging.jms;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,15 +15,16 @@
* limitations under the License.
*
*/
+package org.apache.streampipes.messaging.jms;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
public class ActiveMQUtils {
- private static final String TCP_PROTOCOL = "tcp://";
- private static final String COLON = ":";
+ private static final String TCP_PROTOCOL = "tcp://";
+ private static final String COLON = ":";
- public static String makeActiveMqUrl(JmsTransportProtocol protocol) {
- return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort();
- }
+ public static String makeActiveMqUrl(JmsTransportProtocol protocol) {
+ return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort();
+ }
}
diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/SpJmsProtocol.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/SpJmsProtocol.java
index 293f089966..dcdf03c6f4 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/SpJmsProtocol.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/SpJmsProtocol.java
@@ -24,8 +24,8 @@
public class SpJmsProtocol implements SpProtocolDefinition {
- private EventConsumer jmsConsumer;
- private EventProducer jmsProducer;
+ private final EventConsumer jmsConsumer;
+ private final EventProducer jmsProducer;
public SpJmsProtocol() {
this.jmsConsumer = new ActiveMQConsumer();
diff --git a/streampipes-messaging-kafka/pom.xml b/streampipes-messaging-kafka/pom.xml
index 29485016d6..b23939e3bd 100644
--- a/streampipes-messaging-kafka/pom.xml
+++ b/streampipes-messaging-kafka/pom.xml
@@ -46,4 +46,26 @@
kafka-clients
-
\ No newline at end of file
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+ true
+ true
+
+
+
+
+
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index cf1657dc09..1411b9c03f 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -18,10 +18,6 @@
package org.apache.streampipes.messaging.kafka;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
@@ -29,17 +25,26 @@
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.WildcardTopicDefinition;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
import java.util.regex.Pattern;
public class SpKafkaConsumer implements EventConsumer, Runnable,
- Serializable {
+ Serializable {
private String topic;
private InternalEventProcessor eventProcessor;
@@ -97,9 +102,7 @@ public void onPartitionsAssigned(Collection partitions) {
Duration duration = Duration.of(100, ChronoUnit.MILLIS);
while (isRunning) {
ConsumerRecords records = consumer.poll(duration);
- records.forEach(record -> {
- eventProcessor.onEvent(record.value());
- });
+ records.forEach(record -> eventProcessor.onEvent(record.value()));
}
LOG.info("Closing Kafka Consumer.");
consumer.close();
@@ -111,14 +114,14 @@ private String replaceWildcardWithPatternFormat(String topic) {
}
private Properties makeProperties(KafkaTransportProtocol protocol,
- List appenders) {
+ List appenders) {
return new ConsumerConfigFactory(protocol).buildProperties(appenders);
}
@Override
public void connect(KafkaTransportProtocol protocol, InternalEventProcessor
- eventProcessor)
- throws SpRuntimeException {
+ eventProcessor)
+ throws SpRuntimeException {
LOG.info("Kafka consumer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
if (protocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
this.patternTopic = true;
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index 7c46bb911c..afd3d44a77 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -18,22 +18,31 @@
package org.apache.streampipes.messaging.kafka;
-import org.apache.kafka.clients.admin.*;
+import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.TopicConfig;
-import org.apache.streampipes.commons.constants.Envs;
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.streampipes.messaging.EventProducer;
-import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import java.io.Serializable;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SpKafkaProducer implements EventProducer, Serializable {
@@ -50,7 +59,8 @@ public class SpKafkaProducer implements EventProducer, S
private static final Logger LOG = LoggerFactory.getLogger(SpKafkaProducer.class);
- public SpKafkaProducer() { }
+ public SpKafkaProducer() {
+ }
// TODO backwards compatibility, remove later
public SpKafkaProducer(String url,
@@ -58,7 +68,7 @@ public SpKafkaProducer(String url,
List appenders) {
String[] urlParts = url.split(COLON);
KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
- Integer.parseInt(urlParts[1]), topic);
+ Integer.parseInt(urlParts[1]), topic);
this.brokerUrl = url;
this.topic = topic;
this.producer = new KafkaProducer<>(makeProperties(protocol, appenders));
@@ -115,7 +125,8 @@ private void createKafaTopic(KafkaTransportProtocol settings) throws ExecutionEx
if (!topicExists(topics)) {
Map topicConfig = new HashMap<>();
- String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
+ String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists()
+ ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);
final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProtocol.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProtocol.java
index 305a0c12ab..0a921092d3 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProtocol.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProtocol.java
@@ -25,8 +25,8 @@
public class SpKafkaProtocol implements SpProtocolDefinition {
- private EventConsumer kafkaConsumer;
- private EventProducer kafkaProducer;
+ private final EventConsumer kafkaConsumer;
+ private final EventProducer kafkaProducer;
public SpKafkaProtocol() {
this.kafkaConsumer = new SpKafkaConsumer();
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 2aa488aed5..461d96e04f 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -17,9 +17,10 @@
*/
package org.apache.streampipes.messaging.kafka.config;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import java.util.Properties;
import java.util.UUID;
@@ -44,10 +45,10 @@ public Properties makeDefaultProperties() {
props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId, UUID.randomUUID().toString()));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
- AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT);
+ AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG_DEFAULT);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
- getConfigOrDefault(protocol::getMessageMaxBytes, FETCH_MAX_BYTES_CONFIG_DEFAULT));
+ getConfigOrDefault(protocol::getMessageMaxBytes, FETCH_MAX_BYTES_CONFIG_DEFAULT));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT);
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
index a8f945a756..1e53ebd225 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
@@ -22,5 +22,5 @@
public interface KafkaConfigAppender {
- void appendConfig(Properties props);
+ void appendConfig(Properties props);
}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
index 23426d1248..197cdffcff 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
@@ -17,10 +17,11 @@
*/
package org.apache.streampipes.messaging.kafka.config;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import java.util.Properties;
@@ -37,11 +38,8 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
private static final String VALUE_SERIALIZER_DEFAULT = ByteArraySerializer.class.getName();
-
public ProducerConfigFactory(KafkaTransportProtocol protocol) {
super(protocol);
-
-
}
@Override
@@ -49,14 +47,14 @@ public Properties makeDefaultProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
props.put(ProducerConfig.ACKS_CONFIG, getConfigOrDefault(protocol::getAcks,
- ACKS_CONFIG_DEFAULT));
+ ACKS_CONFIG_DEFAULT));
props.put(ProducerConfig.RETRIES_CONFIG, RETRIES_CONFIG_DEFAULT);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,
- getConfigOrDefault(protocol::getBatchSize, BATCH_SIZE_CONFIG_DEFAULT));
+ getConfigOrDefault(protocol::getBatchSize, BATCH_SIZE_CONFIG_DEFAULT));
props.put(ProducerConfig.LINGER_MS_CONFIG,
- getConfigOrDefault(protocol::getLingerMs, LINGER_MS_DEFAULT));
+ getConfigOrDefault(protocol::getLingerMs, LINGER_MS_DEFAULT));
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getConfigOrDefault(protocol::getMaxRequestSize,
- MAX_REQUEST_SIZE_CONFIG_DEFAULT));
+ MAX_REQUEST_SIZE_CONFIG_DEFAULT));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG_DEFAULT);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER_DEFAULT);
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
index eb40e57872..a7e5fa94fc 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
@@ -26,21 +26,26 @@
public class KafkaSecuritySaslPlainConfig extends KafkaSecurityConfig {
- private final String username;
- private final String password;
+ private final String username;
+ private final String password;
- public KafkaSecuritySaslPlainConfig(String username, String password) {
- this.username = username;
- this.password = password;
- }
+ public KafkaSecuritySaslPlainConfig(String username, String password) {
+ this.username = username;
+ this.password = password;
+ }
- @Override
- public void appendConfig(Properties props) {
+ @Override
+ public void appendConfig(Properties props) {
- props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
+ props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
- String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
- props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
- }
+ String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ + username
+ + "\" password=\""
+ + password
+ + "\";";
+
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
+ }
}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
index 9b77841b1f..2c5ef691e3 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
@@ -26,21 +26,26 @@
public class KafkaSecuritySaslSSLConfig extends KafkaSecurityConfig {
- private final String username;
- private final String password;
+ private final String username;
+ private final String password;
- public KafkaSecuritySaslSSLConfig(String username, String password) {
- this.username = username;
- this.password = password;
- }
+ public KafkaSecuritySaslSSLConfig(String username, String password) {
+ this.username = username;
+ this.password = password;
+ }
- @Override
- public void appendConfig(Properties props) {
+ @Override
+ public void appendConfig(Properties props) {
- props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString());
+ props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString());
- String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
- props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
- }
+ String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ + username
+ + "\" password=\""
+ + password
+ + "\";";
+
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
+ }
}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
index 52795e7fe8..235a6eda39 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
@@ -22,9 +22,8 @@
public class KafkaSecurityUnauthenticatedPlainConfig extends KafkaSecurityConfig {
+ @Override
+ public void appendConfig(Properties props) {
- @Override
- public void appendConfig(Properties props) {
-
- }
+ }
}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
index 84e20296a3..8fdd02fd12 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
@@ -25,8 +25,8 @@
public class KafkaSecurityUnauthenticatedSSLConfig extends KafkaSecurityConfig {
- @Override
- public void appendConfig(Properties props) {
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.toString());
- }
+ @Override
+ public void appendConfig(Properties props) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.toString());
+ }
}
diff --git a/streampipes-messaging-mqtt/pom.xml b/streampipes-messaging-mqtt/pom.xml
index d157488efc..540d57caea 100644
--- a/streampipes-messaging-mqtt/pom.xml
+++ b/streampipes-messaging-mqtt/pom.xml
@@ -45,4 +45,25 @@
-
\ No newline at end of file
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+ true
+ true
+
+
+
+
+
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
index 6ba637a9b4..c5cad2b6d8 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -1,22 +1,24 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.mqtt;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
index 51659912d5..d19d4b575b 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
@@ -1,18 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.mqtt;
@@ -20,6 +21,7 @@
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
@@ -27,11 +29,13 @@
import java.io.Serializable;
public class MqttConsumer extends AbstractMqttConnector implements
- EventConsumer,
- AutoCloseable, Serializable {
+ EventConsumer,
+ AutoCloseable, Serializable {
@Override
- public void connect(MqttTransportProtocol protocolSettings, InternalEventProcessor eventProcessor) throws SpRuntimeException {
+ public void connect(MqttTransportProtocol protocolSettings, InternalEventProcessor eventProcessor)
+ throws SpRuntimeException {
+
try {
this.createBrokerConnection(protocolSettings);
Topic[] topics = {new Topic(protocolSettings.getTopicDefinition().getActualTopicName(), QoS.AT_LEAST_ONCE)};
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
index 91dbd38213..cf4f8356ec 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
@@ -1,24 +1,26 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.mqtt;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
import org.fusesource.mqtt.client.QoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java
index 9c259bf5f4..ccda09075f 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java
@@ -1,18 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.mqtt;
@@ -23,8 +24,8 @@
public class SpMqttProtocol implements SpProtocolDefinition {
- private EventConsumer mqttConsumer;
- private EventProducer mqttProducer;
+ private final EventConsumer mqttConsumer;
+ private final EventProducer mqttProducer;
public SpMqttProtocol() {
this.mqttConsumer = new MqttConsumer();
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java
index dea95abfde..7fad10f689 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java
@@ -1,18 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.mqtt;
@@ -22,7 +23,7 @@
import org.apache.streampipes.model.grounding.TransportProtocol;
public class SpMqttProtocolFactory extends SpProtocolDefinitionFactory {
-
+
@Override
public TransportProtocol getTransportProtocol() {
return new MqttTransportProtocol();
diff --git a/streampipes-messaging-nats/pom.xml b/streampipes-messaging-nats/pom.xml
index 5fdc5a690f..cfb27a1bcd 100644
--- a/streampipes-messaging-nats/pom.xml
+++ b/streampipes-messaging-nats/pom.xml
@@ -46,4 +46,25 @@
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+ true
+ true
+
+
+
+
diff --git a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/AbstractNatsConnector.java b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/AbstractNatsConnector.java
index f72f1e961d..5253cdf243 100644
--- a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/AbstractNatsConnector.java
+++ b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/AbstractNatsConnector.java
@@ -1,27 +1,29 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.nats;
+import org.apache.streampipes.model.grounding.NatsTransportProtocol;
+import org.apache.streampipes.model.nats.NatsConfig;
+
import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Options;
-import org.apache.streampipes.model.grounding.NatsTransportProtocol;
-import org.apache.streampipes.model.nats.NatsConfig;
import java.io.IOException;
import java.time.Duration;
diff --git a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsConsumer.java b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsConsumer.java
index b54fe0f672..ee02fab772 100644
--- a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsConsumer.java
+++ b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsConsumer.java
@@ -1,31 +1,33 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.nats;
-import io.nats.client.Connection;
-import io.nats.client.Dispatcher;
-import io.nats.client.Subscription;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.grounding.NatsTransportProtocol;
import org.apache.streampipes.model.nats.NatsConfig;
+import io.nats.client.Connection;
+import io.nats.client.Dispatcher;
+import io.nats.client.Subscription;
+
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@@ -69,8 +71,7 @@ public boolean isConnected() {
private void createSubscription(InternalEventProcessor eventProcessor) {
dispatcher = natsConnection.createDispatcher((message) -> {});
- this.subscription = dispatcher.subscribe(subject, (message) -> {
- eventProcessor.onEvent(message.getData());
- });
+ this.subscription = dispatcher.subscribe(subject, (message) ->
+ eventProcessor.onEvent(message.getData()));
}
}
diff --git a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsPublisher.java b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsPublisher.java
index f4b0c75f6d..06e7cc7325 100644
--- a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsPublisher.java
+++ b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsPublisher.java
@@ -1,27 +1,29 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.nats;
-import io.nats.client.Connection;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.model.grounding.NatsTransportProtocol;
+import io.nats.client.Connection;
+
import java.io.IOException;
import java.util.concurrent.TimeoutException;
diff --git a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsUtils.java b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsUtils.java
index fb15db9351..7b49469621 100644
--- a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsUtils.java
+++ b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/NatsUtils.java
@@ -1,24 +1,25 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
-
package org.apache.streampipes.messaging.nats;
-import io.nats.client.Options;
import org.apache.streampipes.model.nats.NatsConfig;
+
+import io.nats.client.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/SpNatsProtocol.java b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/SpNatsProtocol.java
index 847507ca04..bf3abb3de2 100644
--- a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/SpNatsProtocol.java
+++ b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/SpNatsProtocol.java
@@ -1,18 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.nats;
@@ -24,13 +25,14 @@
public class SpNatsProtocol implements SpProtocolDefinition {
- private EventConsumer natsConsumer;
- private EventProducer natsProducer;
+ private final EventConsumer natsConsumer;
+ private final EventProducer natsProducer;
public SpNatsProtocol() {
this.natsConsumer = new NatsConsumer();
this.natsProducer = new NatsPublisher();
}
+
@Override
public EventConsumer getConsumer() {
return this.natsConsumer;
diff --git a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/SpNatsProtocolFactory.java b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/SpNatsProtocolFactory.java
index 7d6812cfde..5d18e2b0b7 100644
--- a/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/SpNatsProtocolFactory.java
+++ b/streampipes-messaging-nats/src/main/java/org/apache/streampipes/messaging/nats/SpNatsProtocolFactory.java
@@ -1,18 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.streampipes.messaging.nats;
diff --git a/streampipes-messaging/pom.xml b/streampipes-messaging/pom.xml
index bc9af0206a..1415a3beb9 100644
--- a/streampipes-messaging/pom.xml
+++ b/streampipes-messaging/pom.xml
@@ -35,4 +35,26 @@
0.71.0-SNAPSHOT
-
\ No newline at end of file
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+ true
+ true
+
+
+
+
+
diff --git a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventConsumer.java b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventConsumer.java
index 3e276df67a..dec6f0258f 100644
--- a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventConsumer.java
+++ b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventConsumer.java
@@ -21,12 +21,12 @@
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.grounding.TransportProtocol;
-public interface EventConsumer {
+public interface EventConsumer {
- void connect(TP protocolSettings, InternalEventProcessor eventProcessor) throws
- SpRuntimeException;
+ void connect(T protocolSettings, InternalEventProcessor eventProcessor) throws
+ SpRuntimeException;
- void disconnect() throws SpRuntimeException;
+ void disconnect() throws SpRuntimeException;
- boolean isConnected();
+ boolean isConnected();
}
diff --git a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventProducer.java b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventProducer.java
index 9d3998bc7f..20993d932d 100644
--- a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventProducer.java
+++ b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventProducer.java
@@ -23,13 +23,13 @@
import java.io.Serializable;
-public interface EventProducer extends Serializable {
+public interface EventProducer extends Serializable {
- void connect(TP protocolSettings) throws SpRuntimeException;
+ void connect(T protocolSettings) throws SpRuntimeException;
- void publish(byte[] event);
+ void publish(byte[] event);
- void disconnect() throws SpRuntimeException;
+ void disconnect() throws SpRuntimeException;
- boolean isConnected();
+ boolean isConnected();
}
diff --git a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/SpProtocolDefinition.java b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/SpProtocolDefinition.java
index 2a1655c3bb..cd24baad59 100644
--- a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/SpProtocolDefinition.java
+++ b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/SpProtocolDefinition.java
@@ -20,9 +20,9 @@
import org.apache.streampipes.model.grounding.TransportProtocol;
-public interface SpProtocolDefinition {
+public interface SpProtocolDefinition {
- EventConsumer getConsumer();
+ EventConsumer getConsumer();
- EventProducer getProducer();
+ EventProducer getProducer();
}
diff --git a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/SpProtocolManager.java b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/SpProtocolManager.java
index d15199f4e5..35e2d9b6e6 100644
--- a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/SpProtocolManager.java
+++ b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/SpProtocolManager.java
@@ -28,7 +28,7 @@ public enum SpProtocolManager {
INSTANCE;
- private List> availableProtocols;
+ private final List> availableProtocols;
SpProtocolManager() {
this.availableProtocols = new ArrayList<>();
@@ -42,17 +42,16 @@ public List> getAvailab
return availableProtocols;
}
- public Optional> findDefinition(T
- transportProtocol) {
+ public Optional> findDefinition(T transportProtocol) {
// TODO add RDF URI for protocol in model
return this.availableProtocols
- .stream()
- .filter
- (adf -> adf.getTransportProtocolClass().equals(transportProtocol.getClass()
- .getCanonicalName()))
- .map(s -> (SpProtocolDefinitionFactory) s)
- .map(SpProtocolDefinitionFactory::createInstance)
- .findFirst();
+ .stream()
+ .filter
+ (adf -> adf.getTransportProtocolClass().equals(transportProtocol.getClass()
+ .getCanonicalName()))
+ .map(s -> (SpProtocolDefinitionFactory) s)
+ .map(SpProtocolDefinitionFactory::createInstance)
+ .findFirst();
}
}
diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml
index 4daa8622af..c8545d7dd0 100644
--- a/streampipes-rest/pom.xml
+++ b/streampipes-rest/pom.xml
@@ -163,4 +163,26 @@
test
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+ true
+ true
+
+
+
+
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index faa8bf4229..2b5b724686 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -25,106 +25,111 @@
import org.apache.streampipes.manager.pipeline.PipelineCacheManager;
import org.apache.streampipes.manager.pipeline.PipelineCanvasMetadataCacheManager;
import org.apache.streampipes.manager.pipeline.PipelineManager;
-import org.apache.streampipes.resource.management.UserResourceManager;
-import org.apache.streampipes.model.file.FileMetadata;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.file.FileMetadata;
import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.resource.management.UserResourceManager;
import org.apache.streampipes.storage.api.IDashboardStorage;
import org.apache.streampipes.storage.api.IDashboardWidgetStorage;
import org.apache.streampipes.storage.api.IDataExplorerWidgetStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class ResetManagement {
- // This class should be moved into another package. I moved it here because I got a cyclic maven
- // dependency between this package and streampipes-pipeline-management
- // See in issue [STREAMPIPES-405]
-
- private static final Logger logger = LoggerFactory.getLogger(ResetManagement.class);
-
- /**
- * Remove all configurations for this user. This includes:
- * [pipeline assembly cache, pipelines, adapters, files]
- * @param username
- */
- public static void reset(String username) {
- logger.info("Start resetting the system");
-
- // Set hide tutorial to false for user
- UserResourceManager.setHideTutorial(username, true);
-
- // Clear pipeline assembly Cache
- PipelineCacheManager.removeCachedPipeline(username);
- PipelineCanvasMetadataCacheManager.removeCanvasMetadataFromCache(username);
-
- // Stop and delete all pipelines
- List allPipelines = PipelineManager.getAllPipelines();
- allPipelines.forEach(pipeline -> {
- PipelineManager.stopPipeline(pipeline.getPipelineId(), true);
- PipelineManager.deletePipeline(pipeline.getPipelineId());
- });
-
- // Stop and delete all adapters
- AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
-
+ // This class should be moved into another package. I moved it here because I got a cyclic maven
+ // dependency between this package and streampipes-pipeline-management
+ // See in issue [STREAMPIPES-405]
+
+ private static final Logger logger = LoggerFactory.getLogger(ResetManagement.class);
+
+ /**
+ * Remove all configurations for this user. This includes:
+ * [pipeline assembly cache, pipelines, adapters, files]
+ *
+ * @param username
+ */
+ public static void reset(String username) {
+ logger.info("Start resetting the system");
+
+ // Set hide tutorial to false for user
+ UserResourceManager.setHideTutorial(username, true);
+
+ // Clear pipeline assembly Cache
+ PipelineCacheManager.removeCachedPipeline(username);
+ PipelineCanvasMetadataCacheManager.removeCanvasMetadataFromCache(username);
+
+ // Stop and delete all pipelines
+ List allPipelines = PipelineManager.getAllPipelines();
+ allPipelines.forEach(pipeline -> {
+ PipelineManager.stopPipeline(pipeline.getPipelineId(), true);
+ PipelineManager.deletePipeline(pipeline.getPipelineId());
+ });
+
+ // Stop and delete all adapters
+ AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
+
+ try {
+ List allAdapters = adapterMasterManagement.getAllAdapterInstances();
+ allAdapters.forEach(adapterDescription -> {
try {
- List allAdapters = adapterMasterManagement.getAllAdapterInstances();
- allAdapters.forEach(adapterDescription -> {
- try {
- adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
- } catch (AdapterException e) {
- logger.error("Failed to delete adapter with id: " + adapterDescription.getElementId(), e);
- }
- });
+ adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
} catch (AdapterException e) {
- logger.error("Failed to load all adapter descriptions", e);
+ logger.error("Failed to delete adapter with id: " + adapterDescription.getElementId(), e);
}
-
- // Stop and delete all files
- List allFiles = FileManager.getAllFiles();
- allFiles.forEach(fileMetadata -> {
- FileManager.deleteFile(fileMetadata.getFileId());
- });
-
- // Remove all data in data lake
- DataLakeManagementV4 dataLakeManagementV4 = new DataLakeManagementV4();
- List allMeasurements = dataLakeManagementV4.getAllMeasurements();
- allMeasurements.forEach(measurement -> {
- boolean isSuccessDataLake = dataLakeManagementV4.removeMeasurement(measurement.getMeasureName());
-
- if (isSuccessDataLake) {
- dataLakeManagementV4.removeEventProperty(measurement.getMeasureName());
- }
- });
-
- // Remove all data views widgets
- IDataExplorerWidgetStorage widgetStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getDataExplorerWidgetStorage();
- widgetStorage.getAllDataExplorerWidgets().forEach(widget -> {
- widgetStorage.deleteDataExplorerWidget(widget.getId());
- });
-
- // Remove all data views
- IDashboardStorage dataLakeDashboardStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getDataExplorerDashboardStorage();
- dataLakeDashboardStorage.getAllDashboards().forEach(dashboard -> {
- dataLakeDashboardStorage.deleteDashboard(dashboard.getCouchDbId());
- });
-
- // Remove all dashboard widgets
- IDashboardWidgetStorage dashobardWidgetStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getDashboardWidgetStorage();
- dashobardWidgetStorage.getAllDashboardWidgets().forEach(widget -> {
- dashobardWidgetStorage.deleteDashboardWidget(widget.getId());
- });
-
- // Remove all dashboards
- IDashboardStorage dashboardStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getDashboardStorage();
- dashboardStorage.getAllDashboards().forEach(dashboard -> {
- dashboardStorage.deleteDashboard(dashboard.getCouchDbId());
- });
-
- logger.info("Resetting the system was completed");
+ });
+ } catch (AdapterException e) {
+ logger.error("Failed to load all adapter descriptions", e);
}
+
+ // Stop and delete all files
+ List allFiles = FileManager.getAllFiles();
+ allFiles.forEach(fileMetadata -> {
+ FileManager.deleteFile(fileMetadata.getFileId());
+ });
+
+ // Remove all data in data lake
+ DataLakeManagementV4 dataLakeManagementV4 = new DataLakeManagementV4();
+ List allMeasurements = dataLakeManagementV4.getAllMeasurements();
+ allMeasurements.forEach(measurement -> {
+ boolean isSuccessDataLake = dataLakeManagementV4.removeMeasurement(measurement.getMeasureName());
+
+ if (isSuccessDataLake) {
+ dataLakeManagementV4.removeEventProperty(measurement.getMeasureName());
+ }
+ });
+
+ // Remove all data views widgets
+ IDataExplorerWidgetStorage widgetStorage =
+ StorageDispatcher.INSTANCE.getNoSqlStore().getDataExplorerWidgetStorage();
+ widgetStorage.getAllDataExplorerWidgets().forEach(widget -> {
+ widgetStorage.deleteDataExplorerWidget(widget.getId());
+ });
+
+ // Remove all data views
+ IDashboardStorage dataLakeDashboardStorage =
+ StorageDispatcher.INSTANCE.getNoSqlStore().getDataExplorerDashboardStorage();
+ dataLakeDashboardStorage.getAllDashboards().forEach(dashboard -> {
+ dataLakeDashboardStorage.deleteDashboard(dashboard.getCouchDbId());
+ });
+
+ // Remove all dashboard widgets
+ IDashboardWidgetStorage dashobardWidgetStorage =
+ StorageDispatcher.INSTANCE.getNoSqlStore().getDashboardWidgetStorage();
+ dashobardWidgetStorage.getAllDashboardWidgets().forEach(widget -> {
+ dashobardWidgetStorage.deleteDashboardWidget(widget.getId());
+ });
+
+ // Remove all dashboards
+ IDashboardStorage dashboardStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getDashboardStorage();
+ dashboardStorage.getAllDashboards().forEach(dashboard -> {
+ dashboardStorage.deleteDashboard(dashboard.getCouchDbId());
+ });
+
+ logger.info("Resetting the system was completed");
+ }
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineElement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineElement.java
deleted file mode 100644
index 01cf383eee..0000000000
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineElement.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.rest.api;
-
-
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-
-import javax.ws.rs.core.Response;
-
-public interface IPipelineElement {
-
- Response getAvailable();
- Response getOwn();
-
- Response removeOwn(String elementUri);
-
- Response getElement(String elementUri);
-
-
-}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
deleted file mode 100644
index 7c81afd5a3..0000000000
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.rest.api;
-
-import javax.ws.rs.core.Response;
-
-public interface IPipelineMonitoring {
-
- Response getLogInfoForPipeline(String pipelineId);
-
- Response getMetricsInfoForPipeline(String pipelineId);
-
- Response triggerMonitoringUpdate();
-}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/filter/TokenAuthenticationFilter.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/filter/TokenAuthenticationFilter.java
index 2a1cd9b77a..42b890b902 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/filter/TokenAuthenticationFilter.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/filter/TokenAuthenticationFilter.java
@@ -30,6 +30,7 @@
import org.apache.streampipes.user.management.model.UserAccountDetails;
import org.apache.streampipes.user.management.service.TokenService;
import org.apache.streampipes.user.management.util.TokenUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
@@ -42,72 +43,75 @@
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
import java.io.IOException;
public class TokenAuthenticationFilter extends OncePerRequestFilter {
- private final JwtTokenProvider tokenProvider;
- private final IUserStorage userStorage;
-
- private static final Logger logger = LoggerFactory.getLogger(TokenAuthenticationFilter.class);
-
- public TokenAuthenticationFilter() {
- this.tokenProvider = new JwtTokenProvider();
- this.userStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getUserStorageAPI();
- }
-
- @Override
- protected void doFilterInternal(HttpServletRequest request,
- HttpServletResponse response,
- FilterChain filterChain) throws ServletException, IOException {
- try {
- String jwt = getJwtFromRequest(request);
-
- if (StringUtils.hasText(jwt) && tokenProvider.validateJwtToken(jwt)) {
- String username = tokenProvider.getUserIdFromToken(jwt);
- applySuccessfulAuth(request, username);
- } else {
- String apiKey = getApiKeyFromRequest(request);
- String apiUser = getApiUserFromRequest(request);
- if (StringUtils.hasText(apiKey) && StringUtils.hasText(apiUser)) {
- String hashedToken = TokenUtil.hashToken(apiKey);
- boolean hasValidToken = new TokenService().hasValidToken(apiUser, hashedToken);
- if (hasValidToken) {
- applySuccessfulAuth(request, apiUser);
- }
- }
- }
- } catch (Exception ex) {
- logger.error("Could not set user authentication in security context", ex);
- }
-
- filterChain.doFilter(request, response);
- }
-
- private void applySuccessfulAuth(HttpServletRequest request,
- String username) {
- Principal user = userStorage.getUser(username);
- PrincipalUserDetails> userDetails = user instanceof UserAccount ? new UserAccountDetails((UserAccount) user) : new ServiceAccountDetails((ServiceAccount) user);
- UsernamePasswordAuthenticationToken authentication = new UsernamePasswordAuthenticationToken(userDetails, null, userDetails.getAuthorities());
- authentication.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));
-
- SecurityContextHolder.getContext().setAuthentication(authentication);
- }
-
-
- private String getJwtFromRequest(HttpServletRequest request) {
- String bearerToken = request.getHeader(HttpConstants.AUTHORIZATION);
- if (StringUtils.hasText(bearerToken) && bearerToken.startsWith(HttpConstants.BEARER)) {
- return bearerToken.substring(7);
- }
- return null;
- }
-
- private String getApiKeyFromRequest(HttpServletRequest request) {
- return request.getHeader(HttpConstants.X_API_KEY);
- }
-
- private String getApiUserFromRequest(HttpServletRequest request) {
- return request.getHeader(HttpConstants.X_API_USER);
- }
+ private final JwtTokenProvider tokenProvider;
+ private final IUserStorage userStorage;
+
+ private static final Logger logger = LoggerFactory.getLogger(TokenAuthenticationFilter.class);
+
+ public TokenAuthenticationFilter() {
+ this.tokenProvider = new JwtTokenProvider();
+ this.userStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getUserStorageAPI();
+ }
+
+ @Override
+ protected void doFilterInternal(HttpServletRequest request,
+ HttpServletResponse response,
+ FilterChain filterChain) throws ServletException, IOException {
+ try {
+ String jwt = getJwtFromRequest(request);
+
+ if (StringUtils.hasText(jwt) && tokenProvider.validateJwtToken(jwt)) {
+ String username = tokenProvider.getUserIdFromToken(jwt);
+ applySuccessfulAuth(request, username);
+ } else {
+ String apiKey = getApiKeyFromRequest(request);
+ String apiUser = getApiUserFromRequest(request);
+ if (StringUtils.hasText(apiKey) && StringUtils.hasText(apiUser)) {
+ String hashedToken = TokenUtil.hashToken(apiKey);
+ boolean hasValidToken = new TokenService().hasValidToken(apiUser, hashedToken);
+ if (hasValidToken) {
+ applySuccessfulAuth(request, apiUser);
+ }
+ }
+ }
+ } catch (Exception ex) {
+ logger.error("Could not set user authentication in security context", ex);
+ }
+
+ filterChain.doFilter(request, response);
+ }
+
+ private void applySuccessfulAuth(HttpServletRequest request,
+ String username) {
+ Principal user = userStorage.getUser(username);
+ PrincipalUserDetails> userDetails = user instanceof UserAccount ? new UserAccountDetails((UserAccount) user) :
+ new ServiceAccountDetails((ServiceAccount) user);
+ UsernamePasswordAuthenticationToken authentication =
+ new UsernamePasswordAuthenticationToken(userDetails, null, userDetails.getAuthorities());
+ authentication.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));
+
+ SecurityContextHolder.getContext().setAuthentication(authentication);
+ }
+
+
+ private String getJwtFromRequest(HttpServletRequest request) {
+ String bearerToken = request.getHeader(HttpConstants.AUTHORIZATION);
+ if (StringUtils.hasText(bearerToken) && bearerToken.startsWith(HttpConstants.BEARER)) {
+ return bearerToken.substring(7);
+ }
+ return null;
+ }
+
+ private String getApiKeyFromRequest(HttpServletRequest request) {
+ return request.getHeader(HttpConstants.X_API_KEY);
+ }
+
+ private String getApiUserFromRequest(HttpServletRequest request) {
+ return request.getHeader(HttpConstants.X_API_USER);
+ }
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AccountActivationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AccountActivationResource.java
index 5215dd0370..9737178455 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AccountActivationResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AccountActivationResource.java
@@ -20,7 +20,11 @@
import org.apache.streampipes.commons.exceptions.UserNotFoundException;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetDashboardResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetDashboardResource.java
index 5fa37a7921..9c4f5acf85 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetDashboardResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetDashboardResource.java
@@ -17,17 +17,25 @@
*/
package org.apache.streampipes.rest.impl;
-import org.apache.commons.io.FileUtils;
import org.apache.streampipes.model.client.assetdashboard.AssetDashboardConfig;
import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
import org.apache.streampipes.storage.api.IAssetDashboardStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import org.apache.commons.io.FileUtils;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
-import javax.ws.rs.*;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -119,7 +127,7 @@ public Response storeDashboardImage(@FormDataParam("file_upload") InputStream up
private String getTargetDirectory() {
return System.getProperty("user.home") + File.separator + ".streampipes"
- + File.separator + "assets" + File.separator + APP_ID;
+ + File.separator + "assets" + File.separator + APP_ID;
}
private String getTargetFile(String filename) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
index bbbc0a8bf7..6d42cac697 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
@@ -22,14 +22,23 @@
import org.apache.streampipes.rest.security.AuthConstants;
import org.apache.streampipes.storage.api.IGenericStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -47,7 +56,7 @@ public class AssetManagementResource extends AbstractAuthGuardedRestResource {
@Produces(MediaType.APPLICATION_JSON)
@PreAuthorize(AuthConstants.HAS_READ_ASSETS_PRIVILEGE)
public List