Skip to content

Commit

Permalink
Merge pull request #178 from InterestingLab/rickyhuo.enhance.kafka
Browse files Browse the repository at this point in the history
Rickyhuo.enhance.kafka
  • Loading branch information
kid-xiong authored Nov 18, 2018
2 parents 738353c + 048abad commit 8226c26
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 20 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ assemblyMergeStrategy in assembly := {
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "SimpleLog.class" => MergeStrategy.last
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
case "UnusedStubClass.class" => MergeStrategy.last
case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
Expand Down
2 changes: 1 addition & 1 deletion docs/en/configuration/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
- [Fake](/en/configuration/input-plugins/Fake)
- [File](/en/configuration/input-plugins/File)
- [Hdfs](/en/configuration/input-plugins/Hdfs)
- [Kafka](/en/configuration/input-plugins/Kafka)
- [KafkaStream](/en/configuration/input-plugins/KafkaStream)
- [S3](/en/configuration/input-plugins/S3)
- [Socket](/en/configuration/input-plugins/Socket)
- [Filter Plugin](/en/configuration/filter-plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ This plugin uses Kafka Old Consumer. Supporting Kafka >= 0.8.2.X
| --- | --- | --- | --- |
| [topics](#topics-string) | string | yes | - |
| [consumer.group.id](#consumergroupid-string) | string | yes | - |
| [consumer.zookeeper.connect](#consumerzookeeperconnect-string) | string | yes | - |
| [consumer.bootstrap.servers](#consumerbootstrapservers-string) | string | yes | - |
| [consumer.*](#consumer-string) | string | no | - |

Expand All @@ -29,10 +28,6 @@ Kafka topic. Multiple topics separated by commas. For example, "tpc1,tpc2".

Kafka consumer group id, a unique string that identifies the consumer group this consumer belongs to.

##### consumer.zookeeper.connect [string]

Specifies the ZooKeeper connection string in the form `hostname:port` where host and port are the host and port of a ZooKeeper server

##### consumer.bootstrap.servers [string]

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.This string should be in the form `host1:port1,host2:port2,.... `
Expand All @@ -50,7 +45,6 @@ The way to specify parameters is to use the prefix "consumer" before the paramet
kafka {
topics = "waterdrop"
consumer.bootstrap.servers = "localhost:9092"
consumer.zookeeper.connect = "localhost:2181"
consumer.group.id = "waterdrop_group"
consumer.rebalance.max.retries = 100
}
Expand Down
6 changes: 0 additions & 6 deletions docs/zh-cn/configuration/input-plugins/KafkaStream.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Kafka Input实现Kafka的Old Consumer客户端, 从Kafka消费数据。支持的
| --- | --- | --- | --- |
| [topics](#topics-string) | string | yes | - |
| [consumer.group.id](#consumergroupid-string) | string | yes | - |
| [consumer.zookeeper.connect](#consumerzookeeperconnect-string) | string | yes | - |
| [consumer.bootstrap.servers](#consumerbootstrapservers-string) | string | yes | - |
| [consumer.*](#consumer-string) | string | no | - |

Expand All @@ -27,10 +26,6 @@ Kafka topic名称。如果有多个topic,用","分割,例如: "tpc1,tpc2"。

Kafka consumer group id,用于区分不同的消费组。

##### consumer.zookeeper.connect [string]

Kafka集群的Zookeeper地址

##### consumer.bootstrap.servers [string]

Kafka集群地址,多个用","隔开
Expand All @@ -47,7 +42,6 @@ Kafka集群地址,多个用","隔开
kafkaStream {
topics = "waterdrop"
consumer.bootstrap.servers = "localhost:9092"
consumer.zookeeper.connect = "localhost:2181"
consumer.group.id = "waterdrop_group"
consumer.rebalance.max.retries = 100
}
Expand Down
3 changes: 2 additions & 1 deletion waterdrop-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ providedDeps match {
libraryDependencies ++= Seq(

"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion
exclude("org.spark-project.spark", "unused"),
exclude("org.spark-project.spark", "unused")
exclude("net.jpountz.lz4", "unused"),
"com.typesafe" % "config" % "1.3.1",
"com.alibaba" % "QLExpress" % "3.2.0",
"com.alibaba" % "fastjson" % "1.2.47",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ class KafkaStream extends BaseStreamingInput {
config.hasPath("topics") match {
case true => {
val consumerConfig = config.getConfig(consumerPrefix)
consumerConfig.hasPath("zookeeper.connect") &&
!consumerConfig.getString("zookeeper.connect").trim.isEmpty &&
consumerConfig.hasPath("group.id") &&
consumerConfig.hasPath("group.id") &&
!consumerConfig.getString("group.id").trim.isEmpty match {
case true => (true, "")
case false =>
(false, "please specify [consumer.zookeeper.connect] and [consumer.group.id] as non-empty string")
(false, "please specify [consumer.group.id] as non-empty string")
}
}
case false => (false, "please specify [topics] as non-empty string, multiple topics separated by \",\"")
Expand Down

0 comments on commit 8226c26

Please sign in to comment.