Skip to content

Commit

Permalink
Enable checkstyle for streampipes-messaging-kafka (#820)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Dec 1, 2022
1 parent 99a6b8b commit 1fe4ac5
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 72 deletions.
24 changes: 23 additions & 1 deletion streampipes-messaging-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,26 @@
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
</project>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,33 @@

package org.apache.streampipes.messaging.kafka;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.kafka.config.ConsumerConfigFactory;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.WildcardTopicDefinition;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;

public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, Runnable,
Serializable {
Serializable {

private String topic;
private InternalEventProcessor<byte[]> eventProcessor;
Expand Down Expand Up @@ -97,9 +102,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
Duration duration = Duration.of(100, ChronoUnit.MILLIS);
while (isRunning) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
records.forEach(record -> {
eventProcessor.onEvent(record.value());
});
records.forEach(record -> eventProcessor.onEvent(record.value()));
}
LOG.info("Closing Kafka Consumer.");
consumer.close();
Expand All @@ -111,14 +114,14 @@ private String replaceWildcardWithPatternFormat(String topic) {
}

private Properties makeProperties(KafkaTransportProtocol protocol,
List<KafkaConfigAppender> appenders) {
List<KafkaConfigAppender> appenders) {
return new ConsumerConfigFactory(protocol).buildProperties(appenders);
}

@Override
public void connect(KafkaTransportProtocol protocol, InternalEventProcessor<byte[]>
eventProcessor)
throws SpRuntimeException {
eventProcessor)
throws SpRuntimeException {
LOG.info("Kafka consumer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
if (protocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
this.patternTopic = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,31 @@

package org.apache.streampipes.messaging.kafka;

import org.apache.kafka.clients.admin.*;
import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import java.io.Serializable;
import java.util.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, Serializable {
Expand All @@ -50,15 +59,16 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S

private static final Logger LOG = LoggerFactory.getLogger(SpKafkaProducer.class);

public SpKafkaProducer() { }
public SpKafkaProducer() {
}

// TODO backwards compatibility, remove later
public SpKafkaProducer(String url,
String topic,
List<KafkaConfigAppender> appenders) {
String[] urlParts = url.split(COLON);
KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
Integer.parseInt(urlParts[1]), topic);
Integer.parseInt(urlParts[1]), topic);
this.brokerUrl = url;
this.topic = topic;
this.producer = new KafkaProducer<>(makeProperties(protocol, appenders));
Expand Down Expand Up @@ -115,7 +125,8 @@ private void createKafaTopic(KafkaTransportProtocol settings) throws ExecutionEx

if (!topicExists(topics)) {
Map<String, String> topicConfig = new HashMap<>();
String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists()
? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);

final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

public class SpKafkaProtocol implements SpProtocolDefinition<KafkaTransportProtocol> {

private EventConsumer<KafkaTransportProtocol> kafkaConsumer;
private EventProducer<KafkaTransportProtocol> kafkaProducer;
private final EventConsumer<KafkaTransportProtocol> kafkaConsumer;
private final EventProducer<KafkaTransportProtocol> kafkaProducer;

public SpKafkaProtocol() {
this.kafkaConsumer = new SpKafkaConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
*/
package org.apache.streampipes.messaging.kafka.config;

import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import java.util.Properties;
import java.util.UUID;
Expand All @@ -44,10 +45,10 @@ public Properties makeDefaultProperties() {
props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId, UUID.randomUUID().toString()));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT);
AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG_DEFAULT);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
getConfigOrDefault(protocol::getMessageMaxBytes, FETCH_MAX_BYTES_CONFIG_DEFAULT));
getConfigOrDefault(protocol::getMessageMaxBytes, FETCH_MAX_BYTES_CONFIG_DEFAULT));

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@

public interface KafkaConfigAppender {

void appendConfig(Properties props);
void appendConfig(Properties props);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.streampipes.messaging.kafka.config;

import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import java.util.Properties;

Expand All @@ -37,26 +38,23 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
private static final String VALUE_SERIALIZER_DEFAULT = ByteArraySerializer.class.getName();



public ProducerConfigFactory(KafkaTransportProtocol protocol) {
super(protocol);


}

@Override
public Properties makeDefaultProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
props.put(ProducerConfig.ACKS_CONFIG, getConfigOrDefault(protocol::getAcks,
ACKS_CONFIG_DEFAULT));
ACKS_CONFIG_DEFAULT));
props.put(ProducerConfig.RETRIES_CONFIG, RETRIES_CONFIG_DEFAULT);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,
getConfigOrDefault(protocol::getBatchSize, BATCH_SIZE_CONFIG_DEFAULT));
getConfigOrDefault(protocol::getBatchSize, BATCH_SIZE_CONFIG_DEFAULT));
props.put(ProducerConfig.LINGER_MS_CONFIG,
getConfigOrDefault(protocol::getLingerMs, LINGER_MS_DEFAULT));
getConfigOrDefault(protocol::getLingerMs, LINGER_MS_DEFAULT));
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getConfigOrDefault(protocol::getMaxRequestSize,
MAX_REQUEST_SIZE_CONFIG_DEFAULT));
MAX_REQUEST_SIZE_CONFIG_DEFAULT));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG_DEFAULT);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,26 @@

public class KafkaSecuritySaslPlainConfig extends KafkaSecurityConfig {

private final String username;
private final String password;
private final String username;
private final String password;

public KafkaSecuritySaslPlainConfig(String username, String password) {
this.username = username;
this.password = password;
}
public KafkaSecuritySaslPlainConfig(String username, String password) {
this.username = username;
this.password = password;
}

@Override
public void appendConfig(Properties props) {
@Override
public void appendConfig(Properties props) {

props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());

String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
}
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ username
+ "\" password=\""
+ password
+ "\";";

props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,26 @@

public class KafkaSecuritySaslSSLConfig extends KafkaSecurityConfig {

private final String username;
private final String password;
private final String username;
private final String password;

public KafkaSecuritySaslSSLConfig(String username, String password) {
this.username = username;
this.password = password;
}
public KafkaSecuritySaslSSLConfig(String username, String password) {
this.username = username;
this.password = password;
}

@Override
public void appendConfig(Properties props) {
@Override
public void appendConfig(Properties props) {

props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString());
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString());

String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
}
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ username
+ "\" password=\""
+ password
+ "\";";

props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@

public class KafkaSecurityUnauthenticatedPlainConfig extends KafkaSecurityConfig {

@Override
public void appendConfig(Properties props) {

@Override
public void appendConfig(Properties props) {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

public class KafkaSecurityUnauthenticatedSSLConfig extends KafkaSecurityConfig {

@Override
public void appendConfig(Properties props) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.toString());
}
@Override
public void appendConfig(Properties props) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.toString());
}
}
1 change: 0 additions & 1 deletion streampipes-messaging-nats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,4 @@
</plugin>
</plugins>
</build>

</project>

0 comments on commit 1fe4ac5

Please sign in to comment.