diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b411e677420eb..70357a88b673c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,31 +19,21 @@ package org.apache.spark.api.python import java.io._ import java.net._ -import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} +import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => JMap} import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} -import org.apache.spark.broadcast.Broadcast +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.{InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils -import org.apache.hadoop.io._ -import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat -import org.apache.spark.api.java.function.PairFunction -import scala.util.{Success, Failure, Try} -import org.msgpack -import org.msgpack.ScalaMessagePack -import org.apache.hadoop.mapred.InputFormat private[spark] class PythonRDD[T: ClassTag]( -import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.hadoop.mapred.{JobConf, SequenceFileOutputFormat} -import org.apache.hadoop.conf.Configuration -import java.util - -private[spark] class PythonRDD[T: ClassManifest]( parent: RDD[T], command: Array[Byte], envVars: JMap[String, String], @@ -218,21 +208,20 @@ private[spark] object PythonRDD extends Logging { JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } - // PySpark / Hadoop InputFormat stuff + // PySpark / Hadoop InputFormat// /** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */ - def sequenceFile[K ,V](sc: JavaSparkContext, + def sequenceFile[K, V](sc: JavaSparkContext, path: String, keyClass: String, valueClass: String, keyWrapper: String, valueWrapper: String, minSplits: Int) = { - implicit val kcm = ClassManifest.fromClass(Class.forName(keyClass)).asInstanceOf[ClassManifest[K]] - implicit val vcm = ClassManifest.fromClass(Class.forName(valueClass)).asInstanceOf[ClassManifest[V]] - val kc = kcm.erasure.asInstanceOf[Class[K]] - val vc = vcm.erasure.asInstanceOf[Class[V]] - + implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] + val kc = kcm.runtimeClass.asInstanceOf[Class[K]] + val vc = vcm.runtimeClass.asInstanceOf[Class[V]] val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits) val converted = SerDeUtil.convertRDD[K, V](rdd) JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted)) @@ -285,12 +274,12 @@ private[spark] object PythonRDD extends Logging { keyClazz: String, valueClazz: String, conf: Configuration) = { - implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]] - implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]] - implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]] - val kc = kcm.erasure.asInstanceOf[Class[K]] - val vc = vcm.erasure.asInstanceOf[Class[V]] - val fc = fcm.erasure.asInstanceOf[Class[F]] + implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]] + implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]] + val kc = kcm.runtimeClass.asInstanceOf[Class[K]] + val vc = vcm.runtimeClass.asInstanceOf[Class[V]] + val fc = fcm.runtimeClass.asInstanceOf[Class[F]] val rdd = if (path.isDefined) { sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf) } else { @@ -299,6 +288,10 @@ private[spark] object PythonRDD extends Logging { rdd } + /** + * Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], + * key and value class + */ def hadoopFile[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext, path: String, inputFormatClazz: String, @@ -317,6 +310,10 @@ private[spark] object PythonRDD extends Logging { JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted)) } + /** + * Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python, + * using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], key and value class + */ def hadoopRDD[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext, inputFormatClazz: String, keyClazz: String, @@ -338,12 +335,12 @@ private[spark] object PythonRDD extends Logging { keyClazz: String, valueClazz: String, conf: Configuration) = { - implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]] - implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]] - implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]] - val kc = kcm.erasure.asInstanceOf[Class[K]] - val vc = vcm.erasure.asInstanceOf[Class[V]] - val fc = fcm.erasure.asInstanceOf[Class[F]] + implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]] + implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]] + val kc = kcm.runtimeClass.asInstanceOf[Class[K]] + val vc = vcm.runtimeClass.asInstanceOf[Class[V]] + val fc = fcm.runtimeClass.asInstanceOf[Class[F]] val rdd = if (path.isDefined) { sc.sc.hadoopFile(path.get, fc, kc, vc) } else { @@ -352,6 +349,8 @@ private[spark] object PythonRDD extends Logging { rdd } + // **** // + def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) { elem match { case bytes: Array[Byte] =>