-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats #455
Closed
Closed
Changes from all commits
Commits
Show all changes
75 commits
Select commit
Hold shift + click to select a range
d86325f
Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop …
MLnick 4b0a43f
Refactoring utils into own objects. Cleaning up old commented-out code
MLnick c304cc8
Adding supporting sequncefiles for tests. Cleaning up
MLnick 4e7c9e3
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick 818a1e6
Add seqencefile and Hadoop InputFormat support to PythonRDD
MLnick 4294cbb
Add old Hadoop api methods. Clean up and expand comments. Clean up ar…
MLnick 0f5cd84
Remove unused pair UTF8 class. Add comments to msgpack deserializer
MLnick f1d73e3
mergeConfs returns a copy rather than mutating one of the input argum…
MLnick 4d7ef2e
Fix indentation
MLnick eb40036
Remove unused comment lines
MLnick 1c8efbc
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick 619c0fa
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick 703ee65
Add back msgpack
MLnick 174f520
Add back graphx settings
MLnick 795a763
Change name to WriteInputFormatTestDataGenerator. Cleanup some var na…
MLnick 2beeedb
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick 97ef708
Remove old writeToStream
MLnick 41856a5
Merge branch 'master' into pyspark-inputformats
MLnick f2d76a0
Merge branch 'master' into pyspark-inputformats
MLnick e67212a
Add back msgpack dependency
MLnick dd57922
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick d72bf18
msgpack
MLnick 0c612e5
Merge branch 'master' into pyspark-inputformats
MLnick 65360d5
Adding test SequenceFiles
MLnick 25da1ca
Add generator for nulls, bools, bytes and maps
MLnick 7237263
Add back msgpack serializer and hadoop file code lost during merging
MLnick a67dfad
Clean up Msgpack serialization and registering
MLnick 1bbbfb0
Clean up SparkBuild from merge
MLnick 9d2256e
Merge branch 'master' into pyspark-inputformats
MLnick f6aac55
Bring back msgpack
MLnick 951c117
Merge branch 'master' into pyspark-inputformats
MLnick b20ec7e
Clean up merge duplicate dependencies
MLnick 4e08983
Clean up docs for PySpark context methods
MLnick fc5099e
Add Apache license headers
MLnick 31a2fff
Scalastyle fixes
MLnick 450e0a2
Merge branch 'master' into pyspark-inputformats
MLnick f60959e
Remove msgpack dependency and serializer from PySpark
MLnick 17a656b
remove binary sequencefile for tests
MLnick 1d7c17c
Amend tests to auto-generate sequencefile data in temp dir
MLnick c0ebfb6
Change sequencefile test data generator to easily be called from PySp…
MLnick 44f2857
Remove msgpack dependency and switch serialization to Pyrolite, plus …
MLnick e7552fa
Merge branch 'master' into pyspark-inputformats
MLnick 64eb051
Scalastyle fix
MLnick 78978d9
Add doc for SequenceFile and InputFormat support to Python programmin…
MLnick e001b94
Fix test failures due to ordering
MLnick bef3afb
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick 35b8e3a
Another fix for test ordering
MLnick 5af4770
Merge branch 'master' into pyspark-inputformats
MLnick 077ecb2
Recover earlier changes lost in previous merge for context.py
MLnick 9ef1896
Recover earlier changes lost in previous merge for serializers.py
MLnick 93ef995
Add back context.py changes
MLnick 7caa73a
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick d0f52b6
Python programming guide
MLnick 84fe8e3
Python programming guide space formatting
MLnick 9fe6bd5
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick 15a7d07
Remove default args for key/value classes. Arg names to camelCase
MLnick 01e0813
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick 1a4a1d6
Address @mateiz style comments
MLnick 94beedc
Clean up args in PythonRDD. Set key/value converter defaults to None …
MLnick 43eb728
PySpark InputFormats docs into programming guide
MLnick 085b55f
Move input format tests to tests.py and clean up docs
MLnick 5757f6e
Default key/value classes for sequenceFile asre None
MLnick b65606f
Add converter interface
MLnick 2c18513
Add examples for reading HBase and Cassandra InputFormats from Python
MLnick 3f90c3e
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick 1eaa08b
HBase -> Cassandra app name oversight
MLnick eeb8205
Fix path relative to SPARK_HOME in tests
MLnick 365d0be
Make classes private[python]. Add docs and @Experimental annotation t…
MLnick a985492
Move Converter examples to own package
MLnick 5ebacfa
Update docs for PySpark input formats
MLnick cde6af9
Parameterize converter trait
MLnick d150431
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick 4c972d8
Add license headers
MLnick 761269b
Address @pwendell comments, simplify default writable conversions and…
MLnick 268df7e
Documentation changes mer @pwendell comments
MLnick File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
129 changes: 129 additions & 0 deletions
129
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.api.python | ||
|
||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.Logging | ||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.io._ | ||
import scala.util.{Failure, Success, Try} | ||
import org.apache.spark.annotation.Experimental | ||
|
||
|
||
/** | ||
* :: Experimental :: | ||
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom | ||
* transformation code by overriding the convert method. | ||
*/ | ||
@Experimental | ||
trait Converter[T, U] extends Serializable { | ||
def convert(obj: T): U | ||
} | ||
|
||
private[python] object Converter extends Logging { | ||
|
||
def getInstance(converterClass: Option[String]): Converter[Any, Any] = { | ||
converterClass.map { cc => | ||
Try { | ||
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]] | ||
logInfo(s"Loaded converter: $cc") | ||
c | ||
} match { | ||
case Success(c) => c | ||
case Failure(err) => | ||
logError(s"Failed to load converter: $cc") | ||
throw err | ||
} | ||
}.getOrElse { new DefaultConverter } | ||
} | ||
} | ||
|
||
/** | ||
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects. | ||
* Other objects are passed through without conversion. | ||
*/ | ||
private[python] class DefaultConverter extends Converter[Any, Any] { | ||
|
||
/** | ||
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or | ||
* object representation | ||
*/ | ||
private def convertWritable(writable: Writable): Any = { | ||
import collection.JavaConversions._ | ||
writable match { | ||
case iw: IntWritable => iw.get() | ||
case dw: DoubleWritable => dw.get() | ||
case lw: LongWritable => lw.get() | ||
case fw: FloatWritable => fw.get() | ||
case t: Text => t.toString | ||
case bw: BooleanWritable => bw.get() | ||
case byw: BytesWritable => byw.getBytes | ||
case n: NullWritable => null | ||
case aw: ArrayWritable => aw.get().map(convertWritable(_)) | ||
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) => | ||
(convertWritable(k), convertWritable(v)) | ||
}.toMap) | ||
case other => other | ||
} | ||
} | ||
|
||
def convert(obj: Any): Any = { | ||
obj match { | ||
case writable: Writable => | ||
convertWritable(writable) | ||
case _ => | ||
obj | ||
} | ||
} | ||
} | ||
|
||
/** Utilities for working with Python objects <-> Hadoop-related objects */ | ||
private[python] object PythonHadoopUtil { | ||
|
||
/** | ||
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]] | ||
*/ | ||
def mapToConf(map: java.util.Map[String, String]): Configuration = { | ||
import collection.JavaConversions._ | ||
val conf = new Configuration() | ||
map.foreach{ case (k, v) => conf.set(k, v) } | ||
conf | ||
} | ||
|
||
/** | ||
* Merges two configurations, returns a copy of left with keys from right overwriting | ||
* any matching keys in left | ||
*/ | ||
def mergeConfs(left: Configuration, right: Configuration): Configuration = { | ||
import collection.JavaConversions._ | ||
val copy = new Configuration(left) | ||
right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue)) | ||
copy | ||
} | ||
|
||
/** | ||
* Converts an RDD of key-value pairs, where key and/or value could be instances of | ||
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)] | ||
*/ | ||
def convertRDD[K, V](rdd: RDD[(K, V)], | ||
keyConverter: Converter[Any, Any], | ||
valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = { | ||
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) } | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should return an
RDD[Any, Any]
not anRDD[K, V]
. Once we've converted it we've lost the type information. In the code you've added all we do is pass the entries to the picker which just inspects the runtime type, but if anyone tried to do something that actually relied on this returning K or V it would throw a runtime exception.