diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index adaa1ef6cf9ff..f3b05e1243045 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -17,8 +17,9 @@ package org.apache.spark.api.python +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.apache.spark.Logging +import org.apache.spark.{Logging, SerializableWritable, SparkException} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io._ import scala.util.{Failure, Success, Try} @@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental * transformation code by overriding the convert method. */ @Experimental -trait Converter[T, U] extends Serializable { +trait Converter[T, + U] extends Serializable { def convert(obj: T): U } private[python] object Converter extends Logging { - def getInstance(converterClass: Option[String]): Converter[Any, Any] = { + def getInstance(converterClass: Option[String], + defaultConverter: Converter[Any, Any]): Converter[Any, Any] = { converterClass.map { cc => Try { val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]] @@ -49,7 +51,7 @@ private[python] object Converter extends Logging { logError(s"Failed to load converter: $cc") throw err } - }.getOrElse { new DefaultConverter } + }.getOrElse { defaultConverter } } } @@ -57,7 +59,9 @@ private[python] object Converter extends Logging { * A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects. * Other objects are passed through without conversion. */ -private[python] class DefaultConverter extends Converter[Any, Any] { +private[python] class WritableToJavaConverter( + conf: Broadcast[SerializableWritable[Configuration]], + batchSize: Int) extends Converter[Any, Any] { /** * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or @@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] { case fw: FloatWritable => fw.get() case t: Text => t.toString case bw: BooleanWritable => bw.get() - case byw: BytesWritable => byw.getBytes + case byw: BytesWritable => + val bytes = new Array[Byte](byw.getLength) + System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength) + bytes case n: NullWritable => null - case aw: ArrayWritable => aw.get().map(convertWritable(_)) - case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) => - (convertWritable(k), convertWritable(v)) - }.toMap) + case aw: ArrayWritable => + // Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples. + // Since we can't determine element types for empty arrays, we will not attempt to + // convert to primitive arrays (which get pickled to Python arrays). Users may want + // write custom converters for arrays if they know the element types a priori. + aw.get().map(convertWritable(_)) + case mw: MapWritable => + val map = new java.util.HashMap[Any, Any]() + mw.foreach { case (k, v) => + map.put(convertWritable(k), convertWritable(v)) + } + map + case w: Writable => + if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w case other => other } } - def convert(obj: Any): Any = { + override def convert(obj: Any): Any = { obj match { case writable: Writable => convertWritable(writable) @@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] { } } +/** + * A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array + * types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]] + * to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and + * [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in + * PySpark RDD `saveAsNewAPIHadoopFile` doctest. + */ +private[python] class JavaToWritableConverter extends Converter[Any, Writable] { + + /** + * Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not + * supported out-of-the-box. + */ + private def convertToWritable(obj: Any): Writable = { + import collection.JavaConversions._ + obj match { + case i: java.lang.Integer => new IntWritable(i) + case d: java.lang.Double => new DoubleWritable(d) + case l: java.lang.Long => new LongWritable(l) + case f: java.lang.Float => new FloatWritable(f) + case s: java.lang.String => new Text(s) + case b: java.lang.Boolean => new BooleanWritable(b) + case aob: Array[Byte] => new BytesWritable(aob) + case null => NullWritable.get() + case map: java.util.Map[_, _] => + val mapWritable = new MapWritable() + map.foreach { case (k, v) => + mapWritable.put(convertToWritable(k), convertToWritable(v)) + } + mapWritable + case other => throw new SparkException( + s"Data of type ${other.getClass.getName} cannot be used") + } + } + + override def convert(obj: Any): Writable = obj match { + case writable: Writable => writable + case other => convertToWritable(other) + } +} + /** Utilities for working with Python objects <-> Hadoop-related objects */ private[python] object PythonHadoopUtil { @@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil { /** * Converts an RDD of key-value pairs, where key and/or value could be instances of - * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)] + * [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa. */ def convertRDD[K, V](rdd: RDD[(K, V)], keyConverter: Converter[Any, Any], diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index d6b0988641a97..da3f1ec0be0bf 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -23,15 +23,18 @@ import java.nio.charset.Charset import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} import scala.collection.JavaConversions._ +import scala.language.existentials import scala.reflect.ClassTag import scala.util.Try import net.razorvine.pickle.{Pickler, Unpickler} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.{InputFormat, JobConf} -import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.hadoop.io.compress.CompressionCodec +import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat} import org.apache.spark._ +import org.apache.spark.SparkContext._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD @@ -365,19 +368,17 @@ private[spark] object PythonRDD extends Logging { valueClassMaybeNull: String, keyConverterClass: String, valueConverterClass: String, - minSplits: Int) = { + minSplits: Int, + batchSize: Int) = { val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text") val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text") - implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] - implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] - val kc = kcm.runtimeClass.asInstanceOf[Class[K]] - val vc = vcm.runtimeClass.asInstanceOf[Class[V]] - + val kc = Class.forName(keyClass).asInstanceOf[Class[K]] + val vc = Class.forName(valueClass).asInstanceOf[Class[V]] val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits) - val keyConverter = Converter.getInstance(Option(keyConverterClass)) - val valueConverter = Converter.getInstance(Option(valueConverterClass)) - val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) - JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration())) + val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, + new WritableToJavaConverter(confBroadcasted, batchSize)) + JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } /** @@ -394,17 +395,16 @@ private[spark] object PythonRDD extends Logging { valueClass: String, keyConverterClass: String, valueConverterClass: String, - confAsMap: java.util.HashMap[String, String]) = { - val conf = PythonHadoopUtil.mapToConf(confAsMap) - val baseConf = sc.hadoopConfiguration() - val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf) + confAsMap: java.util.HashMap[String, String], + batchSize: Int) = { + val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration()) val rdd = newAPIHadoopRDDFromClassNames[K, V, F](sc, Some(path), inputFormatClass, keyClass, valueClass, mergedConf) - val keyConverter = Converter.getInstance(Option(keyConverterClass)) - val valueConverter = Converter.getInstance(Option(valueConverterClass)) - val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) - JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf)) + val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, + new WritableToJavaConverter(confBroadcasted, batchSize)) + JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } /** @@ -421,15 +421,16 @@ private[spark] object PythonRDD extends Logging { valueClass: String, keyConverterClass: String, valueConverterClass: String, - confAsMap: java.util.HashMap[String, String]) = { + confAsMap: java.util.HashMap[String, String], + batchSize: Int) = { val conf = PythonHadoopUtil.mapToConf(confAsMap) val rdd = newAPIHadoopRDDFromClassNames[K, V, F](sc, None, inputFormatClass, keyClass, valueClass, conf) - val keyConverter = Converter.getInstance(Option(keyConverterClass)) - val valueConverter = Converter.getInstance(Option(valueConverterClass)) - val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) - JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf)) + val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, + new WritableToJavaConverter(confBroadcasted, batchSize)) + JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]]( @@ -439,18 +440,14 @@ private[spark] object PythonRDD extends Logging { keyClass: String, valueClass: String, conf: Configuration) = { - implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] - implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] - implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]] - val kc = kcm.runtimeClass.asInstanceOf[Class[K]] - val vc = vcm.runtimeClass.asInstanceOf[Class[V]] - val fc = fcm.runtimeClass.asInstanceOf[Class[F]] - val rdd = if (path.isDefined) { + val kc = Class.forName(keyClass).asInstanceOf[Class[K]] + val vc = Class.forName(valueClass).asInstanceOf[Class[V]] + val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]] + if (path.isDefined) { sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf) } else { sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc) } - rdd } /** @@ -467,17 +464,16 @@ private[spark] object PythonRDD extends Logging { valueClass: String, keyConverterClass: String, valueConverterClass: String, - confAsMap: java.util.HashMap[String, String]) = { - val conf = PythonHadoopUtil.mapToConf(confAsMap) - val baseConf = sc.hadoopConfiguration() - val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf) + confAsMap: java.util.HashMap[String, String], + batchSize: Int) = { + val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration()) val rdd = hadoopRDDFromClassNames[K, V, F](sc, Some(path), inputFormatClass, keyClass, valueClass, mergedConf) - val keyConverter = Converter.getInstance(Option(keyConverterClass)) - val valueConverter = Converter.getInstance(Option(valueConverterClass)) - val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) - JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf)) + val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, + new WritableToJavaConverter(confBroadcasted, batchSize)) + JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } /** @@ -494,15 +490,16 @@ private[spark] object PythonRDD extends Logging { valueClass: String, keyConverterClass: String, valueConverterClass: String, - confAsMap: java.util.HashMap[String, String]) = { + confAsMap: java.util.HashMap[String, String], + batchSize: Int) = { val conf = PythonHadoopUtil.mapToConf(confAsMap) val rdd = hadoopRDDFromClassNames[K, V, F](sc, None, inputFormatClass, keyClass, valueClass, conf) - val keyConverter = Converter.getInstance(Option(keyConverterClass)) - val valueConverter = Converter.getInstance(Option(valueConverterClass)) - val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) - JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf)) + val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, + new WritableToJavaConverter(confBroadcasted, batchSize)) + JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]]( @@ -512,18 +509,14 @@ private[spark] object PythonRDD extends Logging { keyClass: String, valueClass: String, conf: Configuration) = { - implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] - implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] - implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]] - val kc = kcm.runtimeClass.asInstanceOf[Class[K]] - val vc = vcm.runtimeClass.asInstanceOf[Class[V]] - val fc = fcm.runtimeClass.asInstanceOf[Class[F]] - val rdd = if (path.isDefined) { + val kc = Class.forName(keyClass).asInstanceOf[Class[K]] + val vc = Class.forName(valueClass).asInstanceOf[Class[V]] + val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]] + if (path.isDefined) { sc.sc.hadoopFile(path.get, fc, kc, vc) } else { sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc) } - rdd } def writeUTF(str: String, dataOut: DataOutputStream) { @@ -561,6 +554,152 @@ private[spark] object PythonRDD extends Logging { } } + private def getMergedConf(confAsMap: java.util.HashMap[String, String], + baseConf: Configuration): Configuration = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + PythonHadoopUtil.mergeConfs(baseConf, conf) + } + + private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null, + valueConverterClass: String = null): (Class[_], Class[_]) = { + // Peek at an element to figure out key/value types. Since Writables are not serializable, + // we cannot call first() on the converted RDD. Instead, we call first() on the original RDD + // and then convert locally. + val (key, value) = rdd.first() + val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass, + new JavaToWritableConverter) + (kc.convert(key).getClass, vc.convert(value).getClass) + } + + private def getKeyValueTypes(keyClass: String, valueClass: String): + Option[(Class[_], Class[_])] = { + for { + k <- Option(keyClass) + v <- Option(valueClass) + } yield (Class.forName(k), Class.forName(v)) + } + + private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String, + defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = { + val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter) + val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter) + (keyConverter, valueConverter) + } + + /** + * Convert an RDD of key-value pairs from internal types to serializable types suitable for + * output, or vice versa. + */ + private def convertRDD[K, V](rdd: RDD[(K, V)], + keyConverterClass: String, + valueConverterClass: String, + defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = { + val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass, + defaultConverter) + PythonHadoopUtil.convertRDD(rdd, kc, vc) + } + + /** + * Output a Python RDD of key-value pairs as a Hadoop SequenceFile using the Writable types + * we convert from the RDD's key and value types. Note that keys and values can't be + * [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java + * `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system. + */ + def saveAsSequenceFile[K, V, C <: CompressionCodec]( + pyRDD: JavaRDD[Array[Byte]], + batchSerialized: Boolean, + path: String, + compressionCodecClass: String) = { + saveAsHadoopFile( + pyRDD, batchSerialized, path, "org.apache.hadoop.mapred.SequenceFileOutputFormat", + null, null, null, null, new java.util.HashMap(), compressionCodecClass) + } + + /** + * Output a Python RDD of key-value pairs to any Hadoop file system, using old Hadoop + * `OutputFormat` in mapred package. Keys and values are converted to suitable output + * types using either user specified converters or, if not specified, + * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types + * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in + * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of + * this RDD. + */ + def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec]( + pyRDD: JavaRDD[Array[Byte]], + batchSerialized: Boolean, + path: String, + outputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String], + compressionCodecClass: String) = { + val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized) + val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse( + inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass)) + val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration) + val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]]) + val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, + new JavaToWritableConverter) + val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]] + converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec) + } + + /** + * Output a Python RDD of key-value pairs to any Hadoop file system, using new Hadoop + * `OutputFormat` in mapreduce package. Keys and values are converted to suitable output + * types using either user specified converters or, if not specified, + * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types + * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in + * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of + * this RDD. + */ + def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]]( + pyRDD: JavaRDD[Array[Byte]], + batchSerialized: Boolean, + path: String, + outputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String]) = { + val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized) + val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse( + inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass)) + val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration) + val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, + new JavaToWritableConverter) + val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]] + converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf) + } + + /** + * Output a Python RDD of key-value pairs to any Hadoop file system, using a Hadoop conf + * converted from the passed-in `confAsMap`. The conf should set relevant output params ( + * e.g., output path, output format, etc), in the same way as it would be configured for + * a Hadoop MapReduce job. Both old and new Hadoop OutputFormat APIs are supported + * (mapred vs. mapreduce). Keys/values are converted for output using either user specified + * converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]]. + */ + def saveAsHadoopDataset[K, V]( + pyRDD: JavaRDD[Array[Byte]], + batchSerialized: Boolean, + confAsMap: java.util.HashMap[String, String], + keyConverterClass: String, + valueConverterClass: String, + useNewAPI: Boolean) = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized), + keyConverterClass, valueConverterClass, new JavaToWritableConverter) + if (useNewAPI) { + converted.saveAsNewAPIHadoopDataset(conf) + } else { + converted.saveAsHadoopDataset(new JobConf(conf)) + } + } + /** * Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by * PySpark. diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 9a012e7254901..efc9009c088a8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -17,13 +17,14 @@ package org.apache.spark.api.python -import scala.util.Try -import org.apache.spark.rdd.RDD -import org.apache.spark.Logging -import scala.util.Success +import scala.collection.JavaConversions._ import scala.util.Failure -import net.razorvine.pickle.Pickler +import scala.util.Try +import net.razorvine.pickle.{Unpickler, Pickler} + +import org.apache.spark.{Logging, SparkException} +import org.apache.spark.rdd.RDD /** Utilities for serialization / deserialization between Python and Java, using Pickle. */ private[python] object SerDeUtil extends Logging { @@ -65,20 +66,52 @@ private[python] object SerDeUtil extends Logging { * by PySpark. By default, if serialization fails, toString is called and the string * representation is serialized */ - def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = { + def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = { val (keyFailed, valueFailed) = checkPickle(rdd.first()) rdd.mapPartitions { iter => val pickle = new Pickler - iter.map { case (k, v) => - if (keyFailed && valueFailed) { - pickle.dumps(Array(k.toString, v.toString)) - } else if (keyFailed) { - pickle.dumps(Array(k.toString, v)) - } else if (!keyFailed && valueFailed) { - pickle.dumps(Array(k, v.toString)) + val cleaned = iter.map { case (k, v) => + val key = if (keyFailed) k.toString else k + val value = if (valueFailed) v.toString else v + Array[Any](key, value) + } + if (batchSize > 1) { + cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched))) + } else { + cleaned.map(pickle.dumps(_)) + } + } + } + + /** + * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)]. + */ + def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batchSerialized: Boolean): RDD[(K, V)] = { + def isPair(obj: Any): Boolean = { + Option(obj.getClass.getComponentType).map(!_.isPrimitive).getOrElse(false) && + obj.asInstanceOf[Array[_]].length == 2 + } + pyRDD.mapPartitions { iter => + val unpickle = new Unpickler + val unpickled = + if (batchSerialized) { + iter.flatMap { batch => + unpickle.loads(batch) match { + case objs: java.util.List[_] => collectionAsScalaIterable(objs) + case other => throw new SparkException( + s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD") + } + } } else { - pickle.dumps(Array(k, v)) + iter.map(unpickle.loads(_)) } + unpickled.map { + case obj if isPair(obj) => + // we only accept (K, V) + val arr = obj.asInstanceOf[Array[_]] + (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V]) + case other => throw new SparkException( + s"RDD element of type ${other.getClass.getName} cannot be used") } } } diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala index f0e3fb9aff5a0..d11db978b842e 100644 --- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -17,15 +17,16 @@ package org.apache.spark.api.python -import org.apache.spark.SparkContext -import org.apache.hadoop.io._ -import scala.Array import java.io.{DataOutput, DataInput} +import java.nio.charset.Charset + +import org.apache.hadoop.io._ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.{SparkContext, SparkException} /** - * A class to test MsgPack serialization on the Scala side, that will be deserialized + * A class to test Pyrolite serialization on the Scala side, that will be deserialized * in Python * @param str * @param int @@ -54,7 +55,13 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten } } -class TestConverter extends Converter[Any, Any] { +private[python] class TestInputKeyConverter extends Converter[Any, Any] { + override def convert(obj: Any) = { + obj.asInstanceOf[IntWritable].get().toChar + } +} + +private[python] class TestInputValueConverter extends Converter[Any, Any] { import collection.JavaConversions._ override def convert(obj: Any) = { val m = obj.asInstanceOf[MapWritable] @@ -62,6 +69,38 @@ class TestConverter extends Converter[Any, Any] { } } +private[python] class TestOutputKeyConverter extends Converter[Any, Any] { + override def convert(obj: Any) = { + new Text(obj.asInstanceOf[Int].toString) + } +} + +private[python] class TestOutputValueConverter extends Converter[Any, Any] { + import collection.JavaConversions._ + override def convert(obj: Any) = { + new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().head) + } +} + +private[python] class DoubleArrayWritable extends ArrayWritable(classOf[DoubleWritable]) + +private[python] class DoubleArrayToWritableConverter extends Converter[Any, Writable] { + override def convert(obj: Any) = obj match { + case arr if arr.getClass.isArray && arr.getClass.getComponentType == classOf[Double] => + val daw = new DoubleArrayWritable + daw.set(arr.asInstanceOf[Array[Double]].map(new DoubleWritable(_))) + daw + case other => throw new SparkException(s"Data of type $other is not supported") + } +} + +private[python] class WritableToDoubleArrayConverter extends Converter[Any, Array[Double]] { + override def convert(obj: Any): Array[Double] = obj match { + case daw : DoubleArrayWritable => daw.get().map(_.asInstanceOf[DoubleWritable].get()) + case other => throw new SparkException(s"Data of type $other is not supported") + } +} + /** * This object contains method to generate SequenceFile test data and write it to a * given directory (probably a temp directory) @@ -97,7 +136,8 @@ object WriteInputFormatTestDataGenerator { sc.parallelize(intKeys).saveAsSequenceFile(intPath) sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) - sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath) + sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) } + ).saveAsSequenceFile(bytesPath) val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false)) sc.parallelize(bools).saveAsSequenceFile(boolPath) sc.parallelize(intKeys).map{ case (k, v) => @@ -106,19 +146,20 @@ object WriteInputFormatTestDataGenerator { // Create test data for ArrayWritable val data = Seq( - (1, Array(1.0, 2.0, 3.0)), + (1, Array()), (2, Array(3.0, 4.0, 5.0)), (3, Array(4.0, 5.0, 6.0)) ) sc.parallelize(data, numSlices = 2) .map{ case (k, v) => - (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_)))) - }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath) + val va = new DoubleArrayWritable + va.set(v.map(new DoubleWritable(_))) + (new IntWritable(k), va) + }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, DoubleArrayWritable]](arrPath) // Create test data for MapWritable, with keys DoubleWritable and values Text val mapData = Seq( - (1, Map(2.0 -> "aa")), - (2, Map(3.0 -> "bb")), + (1, Map()), (2, Map(1.0 -> "cc")), (3, Map(2.0 -> "dd")), (2, Map(1.0 -> "aa")), @@ -126,9 +167,9 @@ object WriteInputFormatTestDataGenerator { ) sc.parallelize(mapData, numSlices = 2).map{ case (i, m) => val mw = new MapWritable() - val k = m.keys.head - val v = m.values.head - mw.put(new DoubleWritable(k), new Text(v)) + m.foreach { case (k, v) => + mw.put(new DoubleWritable(k), new Text(v)) + } (new IntWritable(i), mw) }.saveAsSequenceFile(mapPath) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 90c69713019f2..a88bf27add883 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -383,16 +383,16 @@ Apart from text files, Spark's Python API also supports several other data forma * `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10. -* Details on reading `SequenceFile` and arbitrary Hadoop `InputFormat` are given below. - -### SequenceFile and Hadoop InputFormats +* SequenceFile and Hadoop Input/Output Formats **Note** this feature is currently marked ```Experimental``` and is intended for advanced users. It may be replaced in future with read/write support based on SparkSQL, in which case SparkSQL is the preferred approach. -#### Writable Support +**Writable Support** -PySpark SequenceFile support loads an RDD within Java, and pickles the resulting Java objects using -[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables are automatically converted: +PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the +resulting Java objects using [Pyrolite](https://github.com/irmen/Pyrolite/). When saving an RDD of key-value pairs to SequenceFile, +PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following +Writables are automatically converted: @@ -403,32 +403,30 @@ PySpark SequenceFile support loads an RDD within Java, and pickles the resulting - - -
Writable TypePython Type
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
ArrayWritablelist of primitives, or tuple of objects
MapWritabledict
Custom Class conforming to Java Bean conventionsdict of public properties (via JavaBean getters and setters) + __class__ for the class type
-#### Loading SequenceFiles +Arrays are not handled out-of-the-box. Users need to specify custom `ArrayWritable` subtypes when reading or writing. When writing, +users also need to specify custom converters that convert arrays to custom `ArrayWritable` subtypes. When reading, the default +converter will convert custom `ArrayWritable` subtypes to Java `Object[]`, which then get pickled to Python tuples. To get +Python `array.array` for arrays of primitive types, users need to specify custom converters. + +**Saving and Loading SequenceFiles** -Similarly to text files, SequenceFiles can be loaded by specifying the path. The key and value +Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value classes can be specified, but for standard Writables this is not required. {% highlight python %} ->>> rdd = sc.sequenceFile("path/to/sequencefile/of/doubles") ->>> rdd.collect() # this example has DoubleWritable keys and Text values -[(1.0, u'aa'), - (2.0, u'bb'), - (2.0, u'aa'), - (3.0, u'cc'), - (2.0, u'bb'), - (1.0, u'aa')] +>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) +>>> rdd.saveAsSequenceFile("path/to/file") +>>> sorted(sc.sequenceFile("path/to/file").collect()) +[(1, u'a'), (2, u'aa'), (3, u'aaa')] {% endhighlight %} -#### Loading Other Hadoop InputFormats +**Saving and Loading Other Hadoop Input/Output Formats** -PySpark can also read any Hadoop InputFormat, for both 'new' and 'old' Hadoop APIs. If required, -a Hadoop configuration can be passed in as a Python dict. Here is an example using the +PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both 'new' and 'old' Hadoop MapReduce APIs. +If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the Elasticsearch ESInputFormat: {% highlight python %} @@ -447,8 +445,7 @@ Note that, if the InputFormat simply depends on a Hadoop configuration and/or in the key and value classes can easily be converted according to the above table, then this approach should work well for such cases. -If you have custom serialized binary data (such as loading data from Cassandra / HBase) or custom -classes that don't conform to the JavaBean requirements, then you will first need to +If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to transform that data on the Scala/Java side to something which can be handled by Pyrolite's pickler. A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) trait is provided for this. Simply extend this trait and implement your transformation code in the ```convert``` @@ -456,11 +453,8 @@ method. Remember to ensure that this class, along with any dependencies required classpath. See the [Python examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python) and -the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/pythonconverters) -for examples of using HBase and Cassandra ```InputFormat```. - -Future support for writing data out as ```SequenceFileOutputFormat``` and other ```OutputFormats```, -is forthcoming. +the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters) +for examples of using Cassandra / HBase ```InputFormat``` and ```OutputFormat``` with custom converters. diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py new file mode 100644 index 0000000000000..1dfbf98604425 --- /dev/null +++ b/examples/src/main/python/cassandra_outputformat.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create data in Cassandra fist +(following: https://wiki.apache.org/cassandra/GettingStarted) + +cqlsh> CREATE KEYSPACE test + ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; +cqlsh> use test; +cqlsh:test> CREATE TABLE users ( + ... user_id int PRIMARY KEY, + ... fname text, + ... lname text + ... ); + +> cassandra_outputformat test users 1745 john smith +> cassandra_outputformat test users 1744 john doe +> cassandra_outputformat test users 1746 john smith + +cqlsh:test> SELECT * FROM users; + + user_id | fname | lname +---------+-------+------- + 1745 | john | smith + 1744 | john | doe + 1746 | john | smith +""" +if __name__ == "__main__": + if len(sys.argv) != 7: + print >> sys.stderr, """ + Usage: cassandra_outputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py + Assumes you have created the following table in Cassandra already, + running on , in . + + cqlsh:> CREATE TABLE ( + ... user_id int PRIMARY KEY, + ... fname text, + ... lname text + ... ); + """ + exit(-1) + + host = sys.argv[1] + keyspace = sys.argv[2] + cf = sys.argv[3] + sc = SparkContext(appName="CassandraOutputFormat") + + conf = {"cassandra.output.thrift.address":host, + "cassandra.output.thrift.port":"9160", + "cassandra.output.keyspace":keyspace, + "cassandra.output.partitioner.class":"Murmur3Partitioner", + "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?", + "mapreduce.output.basename":cf, + "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat", + "mapreduce.job.output.key.class":"java.util.Map", + "mapreduce.job.output.value.class":"java.util.List"} + key = {"user_id" : int(sys.argv[4])} + sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset( + conf=conf, + keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter", + valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter") diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py index 3289d9880a0f5..c9fa8e171c2a1 100644 --- a/examples/src/main/python/hbase_inputformat.py +++ b/examples/src/main/python/hbase_inputformat.py @@ -65,7 +65,8 @@ "org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apache.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", - valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter", + keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter", + valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter", conf=conf) output = hbase_rdd.collect() for (k, v) in output: diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py new file mode 100644 index 0000000000000..5e11548fd13f7 --- /dev/null +++ b/examples/src/main/python/hbase_outputformat.py @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create test table in HBase first: + +hbase(main):001:0> create 'test', 'f1' +0 row(s) in 0.7840 seconds + +> hbase_outputformat test row1 f1 q1 value1 +> hbase_outputformat test row2 f1 q1 value2 +> hbase_outputformat test row3 f1 q1 value3 +> hbase_outputformat test row4 f1 q1 value4 + +hbase(main):002:0> scan 'test' +ROW COLUMN+CELL + row1 column=f1:q1, timestamp=1405659615726, value=value1 + row2 column=f1:q1, timestamp=1405659626803, value=value2 + row3 column=f1:q1, timestamp=1405659640106, value=value3 + row4 column=f1:q1, timestamp=1405659650292, value=value4 +4 row(s) in 0.0780 seconds +""" +if __name__ == "__main__": + if len(sys.argv) != 7: + print >> sys.stderr, """ + Usage: hbase_outputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_outputformat.py + Assumes you have created
with column family in HBase running on already + """ + exit(-1) + + host = sys.argv[1] + table = sys.argv[2] + sc = SparkContext(appName="HBaseOutputFormat") + + conf = {"hbase.zookeeper.quorum": host, + "hbase.mapred.outputtable": table, + "mapreduce.outputformat.class" : "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", + "mapreduce.job.output.key.class" : "org.apache.hadoop.hbase.io.ImmutableBytesWritable", + "mapreduce.job.output.value.class" : "org.apache.hadoop.io.Writable"} + + sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( + conf=conf, + keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", + valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter") diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala index 29a65c7a5f295..83feb5703b908 100644 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala @@ -20,7 +20,7 @@ package org.apache.spark.examples.pythonconverters import org.apache.spark.api.python.Converter import java.nio.ByteBuffer import org.apache.cassandra.utils.ByteBufferUtil -import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap} +import collection.JavaConversions._ /** @@ -44,3 +44,25 @@ class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, St mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb))) } } + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts a + * Map[String, Int] to Cassandra key + */ +class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]] { + override def convert(obj: Any): java.util.Map[String, ByteBuffer] = { + val input = obj.asInstanceOf[java.util.Map[String, Int]] + mapAsJavaMap(input.mapValues(i => ByteBufferUtil.bytes(i))) + } +} + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts a + * List[String] to Cassandra value + */ +class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]] { + override def convert(obj: Any): java.util.List[ByteBuffer] = { + val input = obj.asInstanceOf[java.util.List[String]] + seqAsJavaList(input.map(s => ByteBufferUtil.bytes(s))) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala deleted file mode 100644 index 42ae960bd64a1..0000000000000 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.pythonconverters - -import org.apache.spark.api.python.Converter -import org.apache.hadoop.hbase.client.Result -import org.apache.hadoop.hbase.util.Bytes - -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result - * to a String - */ -class HBaseConverter extends Converter[Any, String] { - override def convert(obj: Any): String = { - val result = obj.asInstanceOf[Result] - Bytes.toStringBinary(result.value()) - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala new file mode 100644 index 0000000000000..273bee0a8b30f --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import scala.collection.JavaConversions._ + +import org.apache.spark.api.python.Converter +import org.apache.hadoop.hbase.client.{Put, Result} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts an + * HBase Result to a String + */ +class HBaseResultToStringConverter extends Converter[Any, String] { + override def convert(obj: Any): String = { + val result = obj.asInstanceOf[Result] + Bytes.toStringBinary(result.value()) + } +} + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts an + * ImmutableBytesWritable to a String + */ +class ImmutableBytesWritableToStringConverter extends Converter[Any, String] { + override def convert(obj: Any): String = { + val key = obj.asInstanceOf[ImmutableBytesWritable] + Bytes.toStringBinary(key.get()) + } +} + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts a + * String to an ImmutableBytesWritable + */ +class StringToImmutableBytesWritableConverter extends Converter[Any, ImmutableBytesWritable] { + override def convert(obj: Any): ImmutableBytesWritable = { + val bytes = Bytes.toBytes(obj.asInstanceOf[String]) + new ImmutableBytesWritable(bytes) + } +} + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts a + * list of Strings to HBase Put + */ +class StringListToPutConverter extends Converter[Any, Put] { + override def convert(obj: Any): Put = { + val output = obj.asInstanceOf[java.util.ArrayList[String]].map(Bytes.toBytes(_)).toArray + val put = new Put(output(0)) + put.add(output(1), output(2), output(3)) + } +} diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 024fb881877c9..8504e06a66d91 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -51,6 +51,7 @@ class SparkContext(object): _active_spark_context = None _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH + _default_batch_size_for_serialized_input = 10 def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, @@ -354,7 +355,7 @@ def _dictToJavaMap(self, d): return jm def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, - valueConverter=None, minSplits=None): + valueConverter=None, minSplits=None, batchSize=None): """ Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -374,14 +375,18 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, @param valueConverter: @param minSplits: minimum splits in dataset (default min(2, sc.defaultParallelism)) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ minSplits = minSplits or min(self.defaultParallelism, 2) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, - keyConverter, valueConverter, minSplits) - return RDD(jrdd, self, PickleSerializer()) + keyConverter, valueConverter, minSplits, batchSize) + return RDD(jrdd, self, ser) def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -401,14 +406,18 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + valueClass, keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. @@ -425,14 +434,18 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + valueClass, keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -452,14 +465,18 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter= @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + valueClass, keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. @@ -476,11 +493,15 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, - keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 113a082e16721..1342822f352bc 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -231,6 +231,13 @@ def __init__(self, jrdd, ctx, jrdd_deserializer): self._jrdd_deserializer = jrdd_deserializer self._id = jrdd.id() + def _toPickleSerialization(self): + if (self._jrdd_deserializer == PickleSerializer() or + self._jrdd_deserializer == BatchedSerializer(PickleSerializer())): + return self + else: + return self._reserialize(BatchedSerializer(PickleSerializer(), 10)) + def id(self): """ A unique ID for this RDD (within its SparkContext). @@ -1030,6 +1037,113 @@ def first(self): """ return self.take(1)[0] + def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None): + """ + Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are + converted for output using either user specified converters or, by default, + L{org.apache.spark.api.python.JavaToWritableConverter}. + + @param conf: Hadoop job configuration, passed in as a dict + @param keyConverter: (None by default) + @param valueConverter: (None by default) + """ + jconf = self.ctx._dictToJavaMap(conf) + pickledRDD = self._toPickleSerialization() + batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) + self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, + keyConverter, valueConverter, True) + + def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, + keyConverter=None, valueConverter=None, conf=None): + """ + Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types + will be inferred if not specified. Keys and values are converted for output using either + user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The + C{conf} is applied on top of the base Hadoop conf associated with the SparkContext + of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. + + @param path: path to Hadoop file + @param outputFormatClass: fully qualified classname of Hadoop OutputFormat + (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.IntWritable", None by default) + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.Text", None by default) + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop job configuration, passed in as a dict (None by default) + """ + jconf = self.ctx._dictToJavaMap(conf) + pickledRDD = self._toPickleSerialization() + batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) + self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path, + outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) + + def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): + """ + Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + system, using the old Hadoop OutputFormat API (mapred package). Keys/values are + converted for output using either user specified converters or, by default, + L{org.apache.spark.api.python.JavaToWritableConverter}. + + @param conf: Hadoop job configuration, passed in as a dict + @param keyConverter: (None by default) + @param valueConverter: (None by default) + """ + jconf = self.ctx._dictToJavaMap(conf) + pickledRDD = self._toPickleSerialization() + batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) + self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, + keyConverter, valueConverter, False) + + def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, + keyConverter=None, valueConverter=None, conf=None, + compressionCodecClass=None): + """ + Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + system, using the old Hadoop OutputFormat API (mapred package). Key and value types + will be inferred if not specified. Keys and values are converted for output using either + user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The + C{conf} is applied on top of the base Hadoop conf associated with the SparkContext + of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. + + @param path: path to Hadoop file + @param outputFormatClass: fully qualified classname of Hadoop OutputFormat + (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.IntWritable", None by default) + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.Text", None by default) + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: (None by default) + @param compressionCodecClass: (None by default) + """ + jconf = self.ctx._dictToJavaMap(conf) + pickledRDD = self._toPickleSerialization() + batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) + self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path, + outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, + jconf, compressionCodecClass) + + def saveAsSequenceFile(self, path, compressionCodecClass=None): + """ + Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + system, using the L{org.apache.hadoop.io.Writable} types that we convert from the + RDD's key and value types. The mechanism is as follows: + 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. + 2. Keys and values of this Java RDD are converted to Writables and written out. + + @param path: path to sequence file + @param compressionCodecClass: (None by default) + """ + pickledRDD = self._toPickleSerialization() + batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) + self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched, + path, compressionCodecClass) + def saveAsPickleFile(self, path, batchSize=10): """ Save this RDD as a SequenceFile of serialized objects. The serializer diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index a92abbf371f18..5ebf2854efcc9 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -19,6 +19,7 @@ Unit tests for PySpark; additional tests are implemented as doctests in individual modules. """ +from array import array from fileinput import input from glob import glob import os @@ -300,6 +301,17 @@ def test_sequencefiles(self): ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] self.assertEqual(doubles, ed) + bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BytesWritable").collect()) + ebs = [(1, bytearray('aa', 'utf-8')), + (1, bytearray('aa', 'utf-8')), + (2, bytearray('aa', 'utf-8')), + (2, bytearray('bb', 'utf-8')), + (2, bytearray('bb', 'utf-8')), + (3, bytearray('cc', 'utf-8'))] + self.assertEqual(bytes, ebs) + text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/", "org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text").collect()) @@ -326,14 +338,34 @@ def test_sequencefiles(self): maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.MapWritable").collect()) - em = [(1, {2.0: u'aa'}), + em = [(1, {}), (1, {3.0: u'bb'}), (2, {1.0: u'aa'}), (2, {1.0: u'cc'}), - (2, {3.0: u'bb'}), (3, {2.0: u'dd'})] self.assertEqual(maps, em) + # arrays get pickled to tuples by default + tuples = sorted(self.sc.sequenceFile( + basepath + "/sftestdata/sfarray/", + "org.apache.hadoop.io.IntWritable", + "org.apache.spark.api.python.DoubleArrayWritable").collect()) + et = [(1, ()), + (2, (3.0, 4.0, 5.0)), + (3, (4.0, 5.0, 6.0))] + self.assertEqual(tuples, et) + + # with custom converters, primitive arrays can stay as arrays + arrays = sorted(self.sc.sequenceFile( + basepath + "/sftestdata/sfarray/", + "org.apache.hadoop.io.IntWritable", + "org.apache.spark.api.python.DoubleArrayWritable", + valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect()) + ea = [(1, array('d')), + (2, array('d', [3.0, 4.0, 5.0])), + (3, array('d', [4.0, 5.0, 6.0]))] + self.assertEqual(arrays, ea) + clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/", "org.apache.hadoop.io.Text", "org.apache.spark.api.python.TestWritable").collect()) @@ -342,6 +374,12 @@ def test_sequencefiles(self): u'double': 54.0, u'int': 123, u'str': u'test1'}) self.assertEqual(clazz[0], ec) + unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/", + "org.apache.hadoop.io.Text", + "org.apache.spark.api.python.TestWritable", + batchSize=1).collect()) + self.assertEqual(unbatched_clazz[0], ec) + def test_oldhadoop(self): basepath = self.tempdir.name ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/", @@ -352,10 +390,11 @@ def test_oldhadoop(self): self.assertEqual(ints, ei) hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") - hello = self.sc.hadoopFile(hellopath, - "org.apache.hadoop.mapred.TextInputFormat", - "org.apache.hadoop.io.LongWritable", - "org.apache.hadoop.io.Text").collect() + oldconf = {"mapred.input.dir" : hellopath} + hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text", + conf=oldconf).collect() result = [(0, u'Hello World!')] self.assertEqual(hello, result) @@ -370,10 +409,11 @@ def test_newhadoop(self): self.assertEqual(ints, ei) hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") - hello = self.sc.newAPIHadoopFile(hellopath, - "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", - "org.apache.hadoop.io.LongWritable", - "org.apache.hadoop.io.Text").collect() + newconf = {"mapred.input.dir" : hellopath} + hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text", + conf=newconf).collect() result = [(0, u'Hello World!')] self.assertEqual(hello, result) @@ -408,16 +448,267 @@ def test_bad_inputs(self): "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.Text")) - def test_converter(self): + def test_converters(self): + # use of custom converters basepath = self.tempdir.name maps = sorted(self.sc.sequenceFile( basepath + "/sftestdata/sfmap/", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.MapWritable", - valueConverter="org.apache.spark.api.python.TestConverter").collect()) - em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, [2.0])] + keyConverter="org.apache.spark.api.python.TestInputKeyConverter", + valueConverter="org.apache.spark.api.python.TestInputValueConverter").collect()) + em = [(u'\x01', []), + (u'\x01', [3.0]), + (u'\x02', [1.0]), + (u'\x02', [1.0]), + (u'\x03', [2.0])] + self.assertEqual(maps, em) + +class TestOutputFormat(PySparkTestCase): + + def setUp(self): + PySparkTestCase.setUp(self) + self.tempdir = tempfile.NamedTemporaryFile(delete=False) + os.unlink(self.tempdir.name) + + def tearDown(self): + PySparkTestCase.tearDown(self) + shutil.rmtree(self.tempdir.name, ignore_errors=True) + + def test_sequencefiles(self): + basepath = self.tempdir.name + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/") + ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect()) + self.assertEqual(ints, ei) + + ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] + self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/") + doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect()) + self.assertEqual(doubles, ed) + + ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))] + self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/") + bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect()) + self.assertEqual(bytes, ebs) + + et = [(u'1', u'aa'), + (u'2', u'bb'), + (u'3', u'cc')] + self.sc.parallelize(et).saveAsSequenceFile(basepath + "/sftext/") + text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect()) + self.assertEqual(text, et) + + eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] + self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/") + bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect()) + self.assertEqual(bools, eb) + + en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)] + self.sc.parallelize(en).saveAsSequenceFile(basepath + "/sfnull/") + nulls = sorted(self.sc.sequenceFile(basepath + "/sfnull/").collect()) + self.assertEqual(nulls, en) + + em = [(1, {}), + (1, {3.0: u'bb'}), + (2, {1.0: u'aa'}), + (2, {1.0: u'cc'}), + (3, {2.0: u'dd'})] + self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/") + maps = sorted(self.sc.sequenceFile(basepath + "/sfmap/").collect()) self.assertEqual(maps, em) + def test_oldhadoop(self): + basepath = self.tempdir.name + dict_data = [(1, {}), + (1, {"row1" : 1.0}), + (2, {"row2" : 2.0})] + self.sc.parallelize(dict_data).saveAsHadoopFile( + basepath + "/oldhadoop/", + "org.apache.hadoop.mapred.SequenceFileOutputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable") + result = sorted(self.sc.hadoopFile( + basepath + "/oldhadoop/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable").collect()) + self.assertEqual(result, dict_data) + + conf = { + "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat", + "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable", + "mapred.output.dir" : basepath + "/olddataset/"} + self.sc.parallelize(dict_data).saveAsHadoopDataset(conf) + input_conf = {"mapred.input.dir" : basepath + "/olddataset/"} + old_dataset = sorted(self.sc.hadoopRDD( + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable", + conf=input_conf).collect()) + self.assertEqual(old_dataset, dict_data) + + def test_newhadoop(self): + basepath = self.tempdir.name + # use custom ArrayWritable types and converters to handle arrays + array_data = [(1, array('d')), + (1, array('d', [1.0, 2.0, 3.0])), + (2, array('d', [3.0, 4.0, 5.0]))] + self.sc.parallelize(array_data).saveAsNewAPIHadoopFile( + basepath + "/newhadoop/", + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.spark.api.python.DoubleArrayWritable", + valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter") + result = sorted(self.sc.newAPIHadoopFile( + basepath + "/newhadoop/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.spark.api.python.DoubleArrayWritable", + valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect()) + self.assertEqual(result, array_data) + + conf = {"mapreduce.outputformat.class" : + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable", + "mapred.output.dir" : basepath + "/newdataset/"} + self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf, + valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter") + input_conf = {"mapred.input.dir" : basepath + "/newdataset/"} + new_dataset = sorted(self.sc.newAPIHadoopRDD( + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.spark.api.python.DoubleArrayWritable", + valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter", + conf=input_conf).collect()) + self.assertEqual(new_dataset, array_data) + + def test_newolderror(self): + basepath = self.tempdir.name + rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) + self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile( + basepath + "/newolderror/saveAsHadoopFile/", + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")) + self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile( + basepath + "/newolderror/saveAsNewAPIHadoopFile/", + "org.apache.hadoop.mapred.SequenceFileOutputFormat")) + + def test_bad_inputs(self): + basepath = self.tempdir.name + rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) + self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile( + basepath + "/badinputs/saveAsHadoopFile/", + "org.apache.hadoop.mapred.NotValidOutputFormat")) + self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile( + basepath + "/badinputs/saveAsNewAPIHadoopFile/", + "org.apache.hadoop.mapreduce.lib.output.NotValidOutputFormat")) + + def test_converters(self): + # use of custom converters + basepath = self.tempdir.name + data = [(1, {3.0: u'bb'}), + (2, {1.0: u'aa'}), + (3, {2.0: u'dd'})] + self.sc.parallelize(data).saveAsNewAPIHadoopFile( + basepath + "/converters/", + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + keyConverter="org.apache.spark.api.python.TestOutputKeyConverter", + valueConverter="org.apache.spark.api.python.TestOutputValueConverter") + converted = sorted(self.sc.sequenceFile(basepath + "/converters/").collect()) + expected = [(u'1', 3.0), + (u'2', 1.0), + (u'3', 2.0)] + self.assertEqual(converted, expected) + + def test_reserialization(self): + basepath = self.tempdir.name + x = range(1, 5) + y = range(1001, 1005) + data = zip(x, y) + rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y)) + rdd.saveAsSequenceFile(basepath + "/reserialize/sequence") + result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect()) + self.assertEqual(result1, data) + + rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop", + "org.apache.hadoop.mapred.SequenceFileOutputFormat") + result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect()) + self.assertEqual(result2, data) + + rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop", + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") + result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect()) + self.assertEqual(result3, data) + + conf4 = { + "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat", + "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable", + "mapred.output.dir" : basepath + "/reserialize/dataset"} + rdd.saveAsHadoopDataset(conf4) + result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect()) + self.assertEqual(result4, data) + + conf5 = {"mapreduce.outputformat.class" : + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable", + "mapred.output.dir" : basepath + "/reserialize/newdataset"} + rdd.saveAsNewAPIHadoopDataset(conf5) + result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect()) + self.assertEqual(result5, data) + + def test_unbatched_save_and_read(self): + basepath = self.tempdir.name + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile( + basepath + "/unbatched/") + + unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/", + batchSize=1).collect()) + self.assertEqual(unbatched_sequence, ei) + + unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text", + batchSize=1).collect()) + self.assertEqual(unbatched_hadoopFile, ei) + + unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text", + batchSize=1).collect()) + self.assertEqual(unbatched_newAPIHadoopFile, ei) + + oldconf = {"mapred.input.dir" : basepath + "/unbatched/"} + unbatched_hadoopRDD = sorted(self.sc.hadoopRDD( + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text", + conf=oldconf, + batchSize=1).collect()) + self.assertEqual(unbatched_hadoopRDD, ei) + + newconf = {"mapred.input.dir" : basepath + "/unbatched/"} + unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD( + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text", + conf=newconf, + batchSize=1).collect()) + self.assertEqual(unbatched_newAPIHadoopRDD, ei) + + def test_malformed_RDD(self): + basepath = self.tempdir.name + # non-batch-serialized RDD[[(K, V)]] should be rejected + data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]] + rdd = self.sc.parallelize(data, numSlices=len(data)) + self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile( + basepath + "/malformed/sequence")) class TestDaemon(unittest.TestCase): def connect(self, port):