Skip to content

Commit

Permalink
Add seqencefile and Hadoop InputFormat support to PythonRDD
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Dec 15, 2013
1 parent 4e7c9e3 commit 818a1e6
Showing 1 changed file with 34 additions and 35 deletions.
69 changes: 34 additions & 35 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,21 @@ package org.apache.spark.api.python

import java.io._
import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => JMap}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat
import org.apache.spark.api.java.function.PairFunction
import scala.util.{Success, Failure, Try}
import org.msgpack
import org.msgpack.ScalaMessagePack
import org.apache.hadoop.mapred.InputFormat

private[spark] class PythonRDD[T: ClassTag](
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapred.{JobConf, SequenceFileOutputFormat}
import org.apache.hadoop.conf.Configuration
import java.util

private[spark] class PythonRDD[T: ClassManifest](
parent: RDD[T],
command: Array[Byte],
envVars: JMap[String, String],
Expand Down Expand Up @@ -218,21 +208,20 @@ private[spark] object PythonRDD extends Logging {
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}

// PySpark / Hadoop InputFormat stuff
// PySpark / Hadoop InputFormat//

/** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
def sequenceFile[K ,V](sc: JavaSparkContext,
def sequenceFile[K, V](sc: JavaSparkContext,
path: String,
keyClass: String,
valueClass: String,
keyWrapper: String,
valueWrapper: String,
minSplits: Int) = {
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClass)).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClass)).asInstanceOf[ClassManifest[V]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]

implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val converted = SerDeUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
Expand Down Expand Up @@ -285,12 +274,12 @@ private[spark] object PythonRDD extends Logging {
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]
val fc = fcm.erasure.asInstanceOf[Class[F]]
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
val rdd = if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
Expand All @@ -299,6 +288,10 @@ private[spark] object PythonRDD extends Logging {
rdd
}

/**
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
* key and value class
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
Expand All @@ -317,6 +310,10 @@ private[spark] object PythonRDD extends Logging {
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
* using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], key and value class
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
Expand All @@ -338,12 +335,12 @@ private[spark] object PythonRDD extends Logging {
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]
val fc = fcm.erasure.asInstanceOf[Class[F]]
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
val rdd = if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
Expand All @@ -352,6 +349,8 @@ private[spark] object PythonRDD extends Logging {
rdd
}

// **** //

def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) {
elem match {
case bytes: Array[Byte] =>
Expand Down

0 comments on commit 818a1e6

Please sign in to comment.