Skip to content

Commit

Permalink
Merge pull request #827 from apache/SP-820
Browse files Browse the repository at this point in the history
Enable checkstyle for streampipes-messaging-* modules
  • Loading branch information
dominikriemer authored Dec 2, 2022
2 parents ae4a2ab + 152f9ea commit 35d55fa
Show file tree
Hide file tree
Showing 102 changed files with 1,930 additions and 1,581 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.streampipes.rest.impl.datalake.DataLakeDashboardResource;
import org.apache.streampipes.rest.impl.datalake.DataLakeWidgetResource;
import org.apache.streampipes.rest.impl.datalake.PersistedDataStreamResource;
import org.apache.streampipes.rest.impl.nouser.PipelineElementImportNoUser;
import org.apache.streampipes.rest.impl.pe.DataProcessorResource;
import org.apache.streampipes.rest.impl.pe.DataSinkResource;
import org.apache.streampipes.rest.impl.pe.DataStreamResource;
Expand Down Expand Up @@ -85,7 +84,6 @@ public StreamPipesResourceConfig() {
register(PipelineElementAsset.class);
register(PipelineElementCategory.class);
register(PipelineElementFile.class);
register(PipelineElementImportNoUser.class);
register(PipelineElementImport.class);
register(PipelineElementPreview.class);
register(PipelineElementRuntimeInfo.class);
Expand Down
24 changes: 23 additions & 1 deletion streampipes-messaging-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,26 @@
</dependency>
</dependencies>

</project>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@

public abstract class ActiveMQConnectionProvider {

protected Connection startJmsConnection(String url) {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setAlwaysSyncSend(false);
Connection connect = connectionFactory.createConnection();
protected Connection startJmsConnection(String url) {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setAlwaysSyncSend(false);
Connection connect = connectionFactory.createConnection();

connect.start();
return connect;
} catch (JMSException e) {
throw new AssertionError("Failed to establish the JMS-Connection!", e);
}
connect.start();
return connect;
} catch (JMSException e) {
throw new AssertionError("Failed to establish the JMS-Connection!", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@

package org.apache.streampipes.messaging.jms;

import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;

import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.util.ByteSequence;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import java.io.Serializable;

public class ActiveMQConsumer extends ActiveMQConnectionProvider implements
EventConsumer<JmsTransportProtocol>,
AutoCloseable, Serializable {
EventConsumer<JmsTransportProtocol>,
AutoCloseable, Serializable {

private Session session;
private MessageConsumer consumer;
Expand All @@ -57,13 +59,15 @@ private void initListener() {

@Override
public void connect(JmsTransportProtocol protocolSettings, InternalEventProcessor<byte[]>
eventProcessor) throws SpRuntimeException {
eventProcessor) throws SpRuntimeException {
String url = ActiveMQUtils.makeActiveMqUrl(protocolSettings);

try {
this.eventProcessor = eventProcessor;
session = startJmsConnection(url).createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic(protocolSettings.getTopicDefinition().getActualTopicName()));
consumer = session.createConsumer(session.createTopic(
protocolSettings.getTopicDefinition().getActualTopicName())
);
initListener();
this.connected = true;
} catch (JMSException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

package org.apache.streampipes.messaging.jms;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.SimpleTopicDefinition;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
Expand Down Expand Up @@ -96,16 +97,16 @@ public void connect(JmsTransportProtocol protocolSettings) throws SpRuntimeExcep

try {
this.session = connection
.createSession(false, Session.AUTO_ACKNOWLEDGE);
.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.producer = session.createProducer(session.createTopic(protocolSettings
.getTopicDefinition()
.getActualTopicName()));
.getTopicDefinition()
.getActualTopicName()));
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
this.connection.start();
this.connected = true;
} catch (JMSException e) {
throw new SpRuntimeException("could not connect to activemq broker. Broker: '" +
protocolSettings.getBrokerHostname() + "' Port: " + protocolSettings.getPort());
throw new SpRuntimeException("could not connect to activemq broker. Broker: '"
+ protocolSettings.getBrokerHostname() + "' Port: " + protocolSettings.getPort());
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.streampipes.messaging.jms;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Expand All @@ -15,15 +15,16 @@
* limitations under the License.
*
*/
package org.apache.streampipes.messaging.jms;

import org.apache.streampipes.model.grounding.JmsTransportProtocol;

public class ActiveMQUtils {

private static final String TCP_PROTOCOL = "tcp://";
private static final String COLON = ":";
private static final String TCP_PROTOCOL = "tcp://";
private static final String COLON = ":";

public static String makeActiveMqUrl(JmsTransportProtocol protocol) {
return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort();
}
public static String makeActiveMqUrl(JmsTransportProtocol protocol) {
return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

public class SpJmsProtocol implements SpProtocolDefinition<JmsTransportProtocol> {

private EventConsumer<JmsTransportProtocol> jmsConsumer;
private EventProducer<JmsTransportProtocol> jmsProducer;
private final EventConsumer<JmsTransportProtocol> jmsConsumer;
private final EventProducer<JmsTransportProtocol> jmsProducer;

public SpJmsProtocol() {
this.jmsConsumer = new ActiveMQConsumer();
Expand Down
24 changes: 23 additions & 1 deletion streampipes-messaging-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,26 @@
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
</project>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,33 @@

package org.apache.streampipes.messaging.kafka;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.kafka.config.ConsumerConfigFactory;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.WildcardTopicDefinition;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;

public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, Runnable,
Serializable {
Serializable {

private String topic;
private InternalEventProcessor<byte[]> eventProcessor;
Expand Down Expand Up @@ -97,9 +102,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
Duration duration = Duration.of(100, ChronoUnit.MILLIS);
while (isRunning) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
records.forEach(record -> {
eventProcessor.onEvent(record.value());
});
records.forEach(record -> eventProcessor.onEvent(record.value()));
}
LOG.info("Closing Kafka Consumer.");
consumer.close();
Expand All @@ -111,14 +114,14 @@ private String replaceWildcardWithPatternFormat(String topic) {
}

private Properties makeProperties(KafkaTransportProtocol protocol,
List<KafkaConfigAppender> appenders) {
List<KafkaConfigAppender> appenders) {
return new ConsumerConfigFactory(protocol).buildProperties(appenders);
}

@Override
public void connect(KafkaTransportProtocol protocol, InternalEventProcessor<byte[]>
eventProcessor)
throws SpRuntimeException {
eventProcessor)
throws SpRuntimeException {
LOG.info("Kafka consumer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
if (protocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
this.patternTopic = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,31 @@

package org.apache.streampipes.messaging.kafka;

import org.apache.kafka.clients.admin.*;
import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import java.io.Serializable;
import java.util.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, Serializable {
Expand All @@ -50,15 +59,16 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S

private static final Logger LOG = LoggerFactory.getLogger(SpKafkaProducer.class);

public SpKafkaProducer() { }
public SpKafkaProducer() {
}

// TODO backwards compatibility, remove later
public SpKafkaProducer(String url,
String topic,
List<KafkaConfigAppender> appenders) {
String[] urlParts = url.split(COLON);
KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
Integer.parseInt(urlParts[1]), topic);
Integer.parseInt(urlParts[1]), topic);
this.brokerUrl = url;
this.topic = topic;
this.producer = new KafkaProducer<>(makeProperties(protocol, appenders));
Expand Down Expand Up @@ -115,7 +125,8 @@ private void createKafaTopic(KafkaTransportProtocol settings) throws ExecutionEx

if (!topicExists(topics)) {
Map<String, String> topicConfig = new HashMap<>();
String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists()
? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);

final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

public class SpKafkaProtocol implements SpProtocolDefinition<KafkaTransportProtocol> {

private EventConsumer<KafkaTransportProtocol> kafkaConsumer;
private EventProducer<KafkaTransportProtocol> kafkaProducer;
private final EventConsumer<KafkaTransportProtocol> kafkaConsumer;
private final EventProducer<KafkaTransportProtocol> kafkaProducer;

public SpKafkaProtocol() {
this.kafkaConsumer = new SpKafkaConsumer();
Expand Down
Loading

0 comments on commit 35d55fa

Please sign in to comment.