Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats #455

Closed
wants to merge 75 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
d86325f
Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop …
MLnick Dec 9, 2013
4b0a43f
Refactoring utils into own objects. Cleaning up old commented-out code
MLnick Dec 12, 2013
c304cc8
Adding supporting sequncefiles for tests. Cleaning up
MLnick Dec 15, 2013
4e7c9e3
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Dec 15, 2013
818a1e6
Add seqencefile and Hadoop InputFormat support to PythonRDD
MLnick Dec 15, 2013
4294cbb
Add old Hadoop api methods. Clean up and expand comments. Clean up ar…
MLnick Dec 19, 2013
0f5cd84
Remove unused pair UTF8 class. Add comments to msgpack deserializer
MLnick Dec 19, 2013
f1d73e3
mergeConfs returns a copy rather than mutating one of the input argum…
MLnick Dec 19, 2013
4d7ef2e
Fix indentation
MLnick Dec 19, 2013
eb40036
Remove unused comment lines
MLnick Dec 19, 2013
1c8efbc
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jan 13, 2014
619c0fa
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jan 20, 2014
703ee65
Add back msgpack
MLnick Jan 20, 2014
174f520
Add back graphx settings
MLnick Jan 20, 2014
795a763
Change name to WriteInputFormatTestDataGenerator. Cleanup some var na…
MLnick Jan 20, 2014
2beeedb
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Feb 8, 2014
97ef708
Remove old writeToStream
MLnick Feb 14, 2014
41856a5
Merge branch 'master' into pyspark-inputformats
MLnick Mar 19, 2014
f2d76a0
Merge branch 'master' into pyspark-inputformats
MLnick Mar 19, 2014
e67212a
Add back msgpack dependency
MLnick Mar 19, 2014
dd57922
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Apr 10, 2014
d72bf18
msgpack
MLnick Apr 10, 2014
0c612e5
Merge branch 'master' into pyspark-inputformats
MLnick Apr 12, 2014
65360d5
Adding test SequenceFiles
MLnick Apr 18, 2014
25da1ca
Add generator for nulls, bools, bytes and maps
MLnick Apr 18, 2014
7237263
Add back msgpack serializer and hadoop file code lost during merging
MLnick Apr 18, 2014
a67dfad
Clean up Msgpack serialization and registering
MLnick Apr 18, 2014
1bbbfb0
Clean up SparkBuild from merge
MLnick Apr 18, 2014
9d2256e
Merge branch 'master' into pyspark-inputformats
MLnick Apr 18, 2014
f6aac55
Bring back msgpack
MLnick Apr 18, 2014
951c117
Merge branch 'master' into pyspark-inputformats
MLnick Apr 19, 2014
b20ec7e
Clean up merge duplicate dependencies
MLnick Apr 19, 2014
4e08983
Clean up docs for PySpark context methods
MLnick Apr 19, 2014
fc5099e
Add Apache license headers
MLnick Apr 19, 2014
31a2fff
Scalastyle fixes
MLnick Apr 21, 2014
450e0a2
Merge branch 'master' into pyspark-inputformats
MLnick Apr 21, 2014
f60959e
Remove msgpack dependency and serializer from PySpark
MLnick Apr 21, 2014
17a656b
remove binary sequencefile for tests
MLnick Apr 21, 2014
1d7c17c
Amend tests to auto-generate sequencefile data in temp dir
MLnick Apr 21, 2014
c0ebfb6
Change sequencefile test data generator to easily be called from PySp…
MLnick Apr 21, 2014
44f2857
Remove msgpack dependency and switch serialization to Pyrolite, plus …
MLnick Apr 21, 2014
e7552fa
Merge branch 'master' into pyspark-inputformats
MLnick Apr 22, 2014
64eb051
Scalastyle fix
MLnick Apr 22, 2014
78978d9
Add doc for SequenceFile and InputFormat support to Python programmin…
MLnick Apr 22, 2014
e001b94
Fix test failures due to ordering
MLnick Apr 23, 2014
bef3afb
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Apr 23, 2014
35b8e3a
Another fix for test ordering
MLnick Apr 23, 2014
5af4770
Merge branch 'master' into pyspark-inputformats
MLnick Apr 23, 2014
077ecb2
Recover earlier changes lost in previous merge for context.py
MLnick Apr 23, 2014
9ef1896
Recover earlier changes lost in previous merge for serializers.py
MLnick Apr 23, 2014
93ef995
Add back context.py changes
MLnick Apr 23, 2014
7caa73a
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick May 23, 2014
d0f52b6
Python programming guide
MLnick May 23, 2014
84fe8e3
Python programming guide space formatting
MLnick May 23, 2014
9fe6bd5
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick May 31, 2014
15a7d07
Remove default args for key/value classes. Arg names to camelCase
MLnick Jun 3, 2014
01e0813
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jun 3, 2014
1a4a1d6
Address @mateiz style comments
MLnick Jun 3, 2014
94beedc
Clean up args in PythonRDD. Set key/value converter defaults to None …
MLnick Jun 3, 2014
43eb728
PySpark InputFormats docs into programming guide
MLnick Jun 3, 2014
085b55f
Move input format tests to tests.py and clean up docs
MLnick Jun 3, 2014
5757f6e
Default key/value classes for sequenceFile asre None
MLnick Jun 3, 2014
b65606f
Add converter interface
MLnick Jun 4, 2014
2c18513
Add examples for reading HBase and Cassandra InputFormats from Python
MLnick Jun 4, 2014
3f90c3e
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jun 4, 2014
1eaa08b
HBase -> Cassandra app name oversight
MLnick Jun 4, 2014
eeb8205
Fix path relative to SPARK_HOME in tests
MLnick Jun 4, 2014
365d0be
Make classes private[python]. Add docs and @Experimental annotation t…
MLnick Jun 5, 2014
a985492
Move Converter examples to own package
MLnick Jun 5, 2014
5ebacfa
Update docs for PySpark input formats
MLnick Jun 5, 2014
cde6af9
Parameterize converter trait
MLnick Jun 6, 2014
d150431
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jun 6, 2014
4c972d8
Add license headers
MLnick Jun 6, 2014
761269b
Address @pwendell comments, simplify default writable conversions and…
MLnick Jun 7, 2014
268df7e
Documentation changes mer @pwendell comments
MLnick Jun 8, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.python

import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
import org.apache.spark.annotation.Experimental


/**
* :: Experimental ::
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
* transformation code by overriding the convert method.
*/
@Experimental
trait Converter[T, U] extends Serializable {
def convert(obj: T): U
}

private[python] object Converter extends Logging {

def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
case Success(c) => c
case Failure(err) =>
logError(s"Failed to load converter: $cc")
throw err
}
}.getOrElse { new DefaultConverter }
}
}

/**
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
private[python] class DefaultConverter extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
* object representation
*/
private def convertWritable(writable: Writable): Any = {
import collection.JavaConversions._
writable match {
case iw: IntWritable => iw.get()
case dw: DoubleWritable => dw.get()
case lw: LongWritable => lw.get()
case fw: FloatWritable => fw.get()
case t: Text => t.toString
case bw: BooleanWritable => bw.get()
case byw: BytesWritable => byw.getBytes
case n: NullWritable => null
case aw: ArrayWritable => aw.get().map(convertWritable(_))
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
(convertWritable(k), convertWritable(v))
}.toMap)
case other => other
}
}

def convert(obj: Any): Any = {
obj match {
case writable: Writable =>
convertWritable(writable)
case _ =>
obj
}
}
}

/** Utilities for working with Python objects <-> Hadoop-related objects */
private[python] object PythonHadoopUtil {

/**
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
*/
def mapToConf(map: java.util.Map[String, String]): Configuration = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/**
* Merges two configurations, returns a copy of left with keys from right overwriting
* any matching keys in left
*/
def mergeConfs(left: Configuration, right: Configuration): Configuration = {
import collection.JavaConversions._
val copy = new Configuration(left)
right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
copy
}

/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should return an RDD[Any, Any] not an RDD[K, V]. Once we've converted it we've lost the type information. In the code you've added all we do is pass the entries to the picker which just inspects the runtime type, but if anyone tried to do something that actually relied on this returning K or V it would throw a runtime exception.

keyConverter: Converter[Any, Any],
valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
}

}
179 changes: 178 additions & 1 deletion core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import scala.util.Try

import net.razorvine.pickle.{Pickler, Unpickler}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -266,7 +269,7 @@ private object SpecialLengths {
val TIMING_DATA = -3
}

private[spark] object PythonRDD {
private[spark] object PythonRDD extends Logging {
val UTF8 = Charset.forName("UTF-8")

/**
Expand Down Expand Up @@ -346,6 +349,180 @@ private[spark] object PythonRDD {
}
}

/**
* Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]],
* key and value class.
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def sequenceFile[K, V](
sc: JavaSparkContext,
path: String,
keyClassMaybeNull: String,
valueClassMaybeNull: String,
keyConverterClass: String,
valueConverterClass: String,
minSplits: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]

val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

/**
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
* key and value class.
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is
* passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
* key and value class.
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClass: String,
keyClass: String,
valueClass: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
val rdd = if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
}
rdd
}

/**
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
* key and value class.
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map
* that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
* key and value class
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClass: String,
keyClass: String,
valueClass: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
val rdd = if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
}
rdd
}

def writeUTF(str: String, dataOut: DataOutputStream) {
val bytes = str.getBytes(UTF8)
dataOut.writeInt(bytes.length)
Expand Down
Loading