Skip to content

Commit

Permalink
Fixed long line and improved documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 26, 2015
1 parent 7b88be8 commit c1fdf35
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,15 @@ object KafkaUtils {
}
}

/**
* This is a helper class that wraps the KafkaUtils.createStream() into more
* Python-friendly class and function so that it can be easily
* instantiated and called from Python's KafkaUtils (see SPARK-6027).
*
* The zero-arg constructor helps instantiate this class from the Class object
* classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
* takes care of known parameters instead of passing them from Python
*/
private[kafka]
class KafkaUtilsPythonHelper {
def createStream(
Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

try:
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
# Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
except Py4JJavaError, e:
Expand All @@ -78,7 +80,7 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
1. Include the Kafka library and its dependencies with in the
spark-submit command as
$ bin/spark-submit --package org.apache.spark:spark-streaming-kafka:%s ...
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ...
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
Expand Down

0 comments on commit c1fdf35

Please sign in to comment.