Skip to content

Commit

Permalink
Merge pull request #1 from ramindu90/master
Browse files Browse the repository at this point in the history
Moving Kafka transport to extensions
  • Loading branch information
ramindu90 authored Jun 5, 2017
2 parents a4158de + c52f400 commit 1a320e4
Show file tree
Hide file tree
Showing 13 changed files with 2,101 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
*.rar

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
hs_err_pid*
117 changes: 117 additions & 0 deletions component/input-transport/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
~ Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
~
~ WSO2 Inc. licenses this file to you under the Apache License,
~ Version 2.0 (the "License"); you may not use this file except
~ in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.wso2.extension.siddhi</groupId>
<artifactId>siddhi-io-kafka</artifactId>
<version>4.0.0-M5-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>bundle</packaging>

<artifactId>siddhi-io-kafka-input</artifactId>
<name>Siddhi Extension - Kafka Input Transport</name>


<dependencies>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-query-api</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.log4j.wso2</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>

<!-- Testing purposes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

<!-- todo: remove this - added to test the whole flow -->
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-extension-kafka-output-transport</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-extension-text-input-mapper</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-extension-text-output-mapper</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Name>${project.artifactId}</Bundle-Name>
<Export-Package>
org.wso2.siddhi.extension.input.transport.kafka.*
</Export-Package>
<Import-Package>
*;resolution:=optional
</Import-Package>
<Include-Resource>
META-INF=target/classes/META-INF
</Include-Resource>
<DynamicImport-Package>*</DynamicImport-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.siddhi.extension.input.transport.kafka;

import org.apache.log4j.Logger;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;

/**
* This processes the Kafka messages using a thread pool.
*/
public class ConsumerKafkaGroup {
private static final Logger log = Logger.getLogger(ConsumerKafkaGroup.class);
private final String topics[];
private final String partitions[];
private final Properties props;
private List<KafkaConsumerThread> kafkaConsumerThreadList = new ArrayList<>();
private Map<String, Map<Integer, Long>> topicOffsetMap = new HashMap<>();
private ScheduledExecutorService executorService;
private String threadingOption;

ConsumerKafkaGroup(String topics[], String partitions[], Properties props, Map<String, Map<Integer, Long>>
topicOffsetMap, String threadingOption, ScheduledExecutorService executorService) {
this.threadingOption = threadingOption;
this.topicOffsetMap = topicOffsetMap;
this.topics = topics;
this.partitions = partitions;
this.props = props;
this.executorService = executorService;
}

void pause() {
kafkaConsumerThreadList.forEach(KafkaConsumerThread::pause);
}

void resume() {
kafkaConsumerThreadList.forEach(KafkaConsumerThread::resume);
}

void restore(final Map<String, Map<Integer, Long>> topic) {
kafkaConsumerThreadList.forEach(kafkaConsumerThread -> kafkaConsumerThread.restore(topic));
}

void shutdown() {
kafkaConsumerThreadList.forEach(KafkaConsumerThread::shutdownConsumer);
}

void run(SourceEventListener sourceEventListener) {
try {
if (KafkaSource.SINGLE_THREADED.equals(threadingOption)) {
KafkaConsumerThread kafkaConsumerThread =
new KafkaConsumerThread(sourceEventListener, topics, partitions, props, topicOffsetMap);
kafkaConsumerThreadList.add(kafkaConsumerThread);
log.info("Kafka Consumer thread starting to listen on topic/s: " + Arrays.toString(topics) +
" with partition/s: " + Arrays.toString(partitions));
kafkaConsumerThread.run();
} else if (KafkaSource.TOPIC_WISE.equals(threadingOption)) {
for (String topic : topics) {
KafkaConsumerThread kafkaConsumerThread =
new KafkaConsumerThread(sourceEventListener, new String[]{topic}, partitions, props,
topicOffsetMap);
kafkaConsumerThreadList.add(kafkaConsumerThread);
executorService.submit(kafkaConsumerThread);
log.info("Kafka Consumer thread starting to listen on topic: " + topic +
" with partition/s: " + Arrays.toString(partitions));
}
} else if (KafkaSource.PARTITION_WISE.equals(threadingOption)) {
for (String topic : topics) {
for (String partition : partitions) {
KafkaConsumerThread kafkaConsumerThread =
new KafkaConsumerThread(sourceEventListener, new String[]{topic},
new String[]{partition}, props, topicOffsetMap);
kafkaConsumerThreadList.add(kafkaConsumerThread);
executorService.submit(kafkaConsumerThread);
log.info("Kafka Consumer thread starting to listen on topic: " + topic +
" with partition: " + partition);
}
}
}
} catch (Throwable t) {
log.error("Error while creating KafkaConsumerThread for topic/s: " + Arrays.toString(topics), t);
}
}

public Map<String, Map<Integer, Long>> getTopicOffsetMap() {
Map<String, Map<Integer, Long>> topicOffsetMap = new HashMap<>();
for (KafkaConsumerThread kafkaConsumerThread : kafkaConsumerThreadList) {
Map<String, Map<Integer, Long>> topicOffsetMapTemp = kafkaConsumerThread.getTopicOffsetMap();
for (Map.Entry<String, Map<Integer, Long>> entry : topicOffsetMapTemp.entrySet()) {
topicOffsetMap.put(entry.getKey(), entry.getValue());
}
}
return topicOffsetMap;
}
}
Loading

0 comments on commit 1a320e4

Please sign in to comment.