Skip to content

Commit

Permalink
Enable checkstyle for streampipes-messaging-jms (#820)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Dec 1, 2022
1 parent b11227e commit abcbe8a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@

public abstract class ActiveMQConnectionProvider {

protected Connection startJmsConnection(String url) {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setAlwaysSyncSend(false);
Connection connect = connectionFactory.createConnection();
protected Connection startJmsConnection(String url) {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setAlwaysSyncSend(false);
Connection connect = connectionFactory.createConnection();

connect.start();
return connect;
} catch (JMSException e) {
throw new AssertionError("Failed to establish the JMS-Connection!", e);
}
connect.start();
return connect;
} catch (JMSException e) {
throw new AssertionError("Failed to establish the JMS-Connection!", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@

package org.apache.streampipes.messaging.jms;

import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;

import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.util.ByteSequence;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import java.io.Serializable;

public class ActiveMQConsumer extends ActiveMQConnectionProvider implements
EventConsumer<JmsTransportProtocol>,
AutoCloseable, Serializable {
EventConsumer<JmsTransportProtocol>,
AutoCloseable, Serializable {

private Session session;
private MessageConsumer consumer;
Expand All @@ -57,13 +59,15 @@ private void initListener() {

@Override
public void connect(JmsTransportProtocol protocolSettings, InternalEventProcessor<byte[]>
eventProcessor) throws SpRuntimeException {
eventProcessor) throws SpRuntimeException {
String url = ActiveMQUtils.makeActiveMqUrl(protocolSettings);

try {
this.eventProcessor = eventProcessor;
session = startJmsConnection(url).createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic(protocolSettings.getTopicDefinition().getActualTopicName()));
consumer = session.createConsumer(session.createTopic(
protocolSettings.getTopicDefinition().getActualTopicName())
);
initListener();
this.connected = true;
} catch (JMSException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

package org.apache.streampipes.messaging.jms;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.SimpleTopicDefinition;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
Expand Down Expand Up @@ -96,16 +97,16 @@ public void connect(JmsTransportProtocol protocolSettings) throws SpRuntimeExcep

try {
this.session = connection
.createSession(false, Session.AUTO_ACKNOWLEDGE);
.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.producer = session.createProducer(session.createTopic(protocolSettings
.getTopicDefinition()
.getActualTopicName()));
.getTopicDefinition()
.getActualTopicName()));
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
this.connection.start();
this.connected = true;
} catch (JMSException e) {
throw new SpRuntimeException("could not connect to activemq broker. Broker: '" +
protocolSettings.getBrokerHostname() + "' Port: " + protocolSettings.getPort());
throw new SpRuntimeException("could not connect to activemq broker. Broker: '"
+ protocolSettings.getBrokerHostname() + "' Port: " + protocolSettings.getPort());
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.streampipes.messaging.jms;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Expand All @@ -15,15 +15,16 @@
* limitations under the License.
*
*/
package org.apache.streampipes.messaging.jms;

import org.apache.streampipes.model.grounding.JmsTransportProtocol;

public class ActiveMQUtils {

private static final String TCP_PROTOCOL = "tcp://";
private static final String COLON = ":";
private static final String TCP_PROTOCOL = "tcp://";
private static final String COLON = ":";

public static String makeActiveMqUrl(JmsTransportProtocol protocol) {
return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort();
}
public static String makeActiveMqUrl(JmsTransportProtocol protocol) {
return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

public class SpJmsProtocol implements SpProtocolDefinition<JmsTransportProtocol> {

private EventConsumer<JmsTransportProtocol> jmsConsumer;
private EventProducer<JmsTransportProtocol> jmsProducer;
private final EventConsumer<JmsTransportProtocol> jmsConsumer;
private final EventProducer<JmsTransportProtocol> jmsProducer;

public SpJmsProtocol() {
this.jmsConsumer = new ActiveMQConsumer();
Expand Down

0 comments on commit abcbe8a

Please sign in to comment.