Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2024] Add saveAsSequenceFile to PySpark #1338

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.api.python

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
Expand All @@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
* transformation code by overriding the convert method.
*/
@Experimental
trait Converter[T, U] extends Serializable {
trait Converter[T, + U] extends Serializable {
def convert(obj: T): U
}

private[python] object Converter extends Logging {

def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
def getInstance(converterClass: Option[String],
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
Expand All @@ -49,15 +51,17 @@ private[python] object Converter extends Logging {
logError(s"Failed to load converter: $cc")
throw err
}
}.getOrElse { new DefaultConverter }
}.getOrElse { defaultConverter }
}
}

/**
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
private[python] class DefaultConverter extends Converter[Any, Any] {
private[python] class WritableToJavaConverter(
conf: Broadcast[SerializableWritable[Configuration]],
batchSize: Int) extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
Expand All @@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
case fw: FloatWritable => fw.get()
case t: Text => t.toString
case bw: BooleanWritable => bw.get()
case byw: BytesWritable => byw.getBytes
case byw: BytesWritable =>
val bytes = new Array[Byte](byw.getLength)
System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
bytes
case n: NullWritable => null
case aw: ArrayWritable => aw.get().map(convertWritable(_))
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
(convertWritable(k), convertWritable(v))
}.toMap)
case aw: ArrayWritable =>
// Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
// Since we can't determine element types for empty arrays, we will not attempt to
// convert to primitive arrays (which get pickled to Python arrays). Users may want
// write custom converters for arrays if they know the element types a priori.
aw.get().map(convertWritable(_))
case mw: MapWritable =>
val map = new java.util.HashMap[Any, Any]()
mw.foreach { case (k, v) =>
map.put(convertWritable(k), convertWritable(v))
}
map
case w: Writable =>
if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
case other => other
}
}

def convert(obj: Any): Any = {
override def convert(obj: Any): Any = {
obj match {
case writable: Writable =>
convertWritable(writable)
Expand All @@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
}
}

/**
* A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array
* types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]]
* to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and
* [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in
* PySpark RDD `saveAsNewAPIHadoopFile` doctest.
*/
private[python] class JavaToWritableConverter extends Converter[Any, Writable] {

/**
* Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not
* supported out-of-the-box.
*/
private def convertToWritable(obj: Any): Writable = {
import collection.JavaConversions._
obj match {
case i: java.lang.Integer => new IntWritable(i)
case d: java.lang.Double => new DoubleWritable(d)
case l: java.lang.Long => new LongWritable(l)
case f: java.lang.Float => new FloatWritable(f)
case s: java.lang.String => new Text(s)
case b: java.lang.Boolean => new BooleanWritable(b)
case aob: Array[Byte] => new BytesWritable(aob)
case null => NullWritable.get()
case map: java.util.Map[_, _] =>
val mapWritable = new MapWritable()
map.foreach { case (k, v) =>
mapWritable.put(convertToWritable(k), convertToWritable(v))
}
mapWritable
case other => throw new SparkException(
s"Data of type ${other.getClass.getName} cannot be used")
}
}

override def convert(obj: Any): Writable = obj match {
case writable: Writable => writable
case other => convertToWritable(other)
}
}

/** Utilities for working with Python objects <-> Hadoop-related objects */
private[python] object PythonHadoopUtil {

Expand All @@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {

/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
* [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
keyConverter: Converter[Any, Any],
Expand Down
Loading