Skip to content

Commit

Permalink
Fixed --jar not working for KafkaUtils and improved error message
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 26, 2015
1 parent d20559b commit 7b88be8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, StringDecoder}
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}

import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.{SparkContext, SparkException}
Expand Down Expand Up @@ -532,3 +532,22 @@ object KafkaUtils {
)
}
}

private[kafka]
class KafkaUtilsPythonHelper {
def createStream(
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
jssc,
classOf[Array[Byte]],
classOf[Array[Byte]],
classOf[DefaultDecoder],
classOf[DefaultDecoder],
kafkaParams,
topics,
storageLevel)
}
}
38 changes: 25 additions & 13 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

from py4j.java_collections import MapConverter
from py4j.java_gateway import java_import, Py4JError
from py4j.java_gateway import java_import, Py4JError, Py4JJavaError

from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
Expand Down Expand Up @@ -63,20 +63,32 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

def getClassByName(name):
return ssc._jvm.org.apache.spark.util.Utils.classForName(name)

try:
array = getClassByName("[B")
decoder = getClassByName("kafka.serializer.DefaultDecoder")
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
jparam, jtopics, jlevel)
except Py4JError, e:
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if not e.message or 'call a package' in e.message:
print "No kafka package, please put the assembly jar into classpath:"
print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
"scala-*/spark-streaming-kafka-assembly-*.jar"
if 'ClassNotFoundException' in str(e.java_exception):
print """
________________________________________________________________________________________________
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
1. Include the Kafka library and its dependencies with in the
spark-submit command as
$ bin/spark-submit --package org.apache.spark:spark-streaming-kafka:%s ...
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
Then, innclude the jar in the spark-submit command as
$ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
________________________________________________________________________________________________
""" % (ssc.sparkContext.version, ssc.sparkContext.version)
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
Expand Down

0 comments on commit 7b88be8

Please sign in to comment.