diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConnectionProvider.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConnectionProvider.java index 835dd7df6a..4de17d1678 100644 --- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConnectionProvider.java +++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConnectionProvider.java @@ -25,16 +25,16 @@ public abstract class ActiveMQConnectionProvider { - protected Connection startJmsConnection(String url) { - try { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); - connectionFactory.setAlwaysSyncSend(false); - Connection connect = connectionFactory.createConnection(); + protected Connection startJmsConnection(String url) { + try { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); + connectionFactory.setAlwaysSyncSend(false); + Connection connect = connectionFactory.createConnection(); - connect.start(); - return connect; - } catch (JMSException e) { - throw new AssertionError("Failed to establish the JMS-Connection!", e); - } + connect.start(); + return connect; + } catch (JMSException e) { + throw new AssertionError("Failed to establish the JMS-Connection!", e); } + } } diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java index 4e85243295..f043f785b4 100644 --- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java +++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java @@ -18,22 +18,24 @@ package org.apache.streampipes.messaging.jms; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.util.ByteSequence; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.messaging.EventConsumer; import org.apache.streampipes.messaging.InternalEventProcessor; import org.apache.streampipes.model.grounding.JmsTransportProtocol; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.util.ByteSequence; + import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; + import java.io.Serializable; public class ActiveMQConsumer extends ActiveMQConnectionProvider implements - EventConsumer, - AutoCloseable, Serializable { + EventConsumer, + AutoCloseable, Serializable { private Session session; private MessageConsumer consumer; @@ -57,13 +59,15 @@ private void initListener() { @Override public void connect(JmsTransportProtocol protocolSettings, InternalEventProcessor - eventProcessor) throws SpRuntimeException { + eventProcessor) throws SpRuntimeException { String url = ActiveMQUtils.makeActiveMqUrl(protocolSettings); try { this.eventProcessor = eventProcessor; session = startJmsConnection(url).createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createConsumer(session.createTopic(protocolSettings.getTopicDefinition().getActualTopicName())); + consumer = session.createConsumer(session.createTopic( + protocolSettings.getTopicDefinition().getActualTopicName()) + ); initListener(); this.connected = true; } catch (JMSException e) { diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java index 40a11640d3..374f504b24 100644 --- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java +++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java @@ -18,14 +18,15 @@ package org.apache.streampipes.messaging.jms; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.messaging.EventProducer; import org.apache.streampipes.model.grounding.JmsTransportProtocol; import org.apache.streampipes.model.grounding.SimpleTopicDefinition; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -96,16 +97,16 @@ public void connect(JmsTransportProtocol protocolSettings) throws SpRuntimeExcep try { this.session = connection - .createSession(false, Session.AUTO_ACKNOWLEDGE); + .createSession(false, Session.AUTO_ACKNOWLEDGE); this.producer = session.createProducer(session.createTopic(protocolSettings - .getTopicDefinition() - .getActualTopicName())); + .getTopicDefinition() + .getActualTopicName())); this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); this.connection.start(); this.connected = true; } catch (JMSException e) { - throw new SpRuntimeException("could not connect to activemq broker. Broker: '" + - protocolSettings.getBrokerHostname() + "' Port: " + protocolSettings.getPort()); + throw new SpRuntimeException("could not connect to activemq broker. Broker: '" + + protocolSettings.getBrokerHostname() + "' Port: " + protocolSettings.getPort()); } } diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java index 0a66af0786..5bda0e642f 100644 --- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java +++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java @@ -1,4 +1,4 @@ -package org.apache.streampipes.messaging.jms;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,15 +15,16 @@ * limitations under the License. * */ +package org.apache.streampipes.messaging.jms; import org.apache.streampipes.model.grounding.JmsTransportProtocol; public class ActiveMQUtils { - private static final String TCP_PROTOCOL = "tcp://"; - private static final String COLON = ":"; + private static final String TCP_PROTOCOL = "tcp://"; + private static final String COLON = ":"; - public static String makeActiveMqUrl(JmsTransportProtocol protocol) { - return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort(); - } + public static String makeActiveMqUrl(JmsTransportProtocol protocol) { + return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort(); + } } diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/SpJmsProtocol.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/SpJmsProtocol.java index 293f089966..dcdf03c6f4 100644 --- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/SpJmsProtocol.java +++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/SpJmsProtocol.java @@ -24,8 +24,8 @@ public class SpJmsProtocol implements SpProtocolDefinition { - private EventConsumer jmsConsumer; - private EventProducer jmsProducer; + private final EventConsumer jmsConsumer; + private final EventProducer jmsProducer; public SpJmsProtocol() { this.jmsConsumer = new ActiveMQConsumer();