-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2024] Add saveAsSequenceFile to PySpark #1338
Conversation
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16446/ |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16468/ |
LGTM, awesome! |
Jenkins, retest this please |
QA tests have started for PR 1338. This patch merges cleanly. |
@@ -403,31 +403,30 @@ PySpark SequenceFile support loads an RDD within Java, and pickles the resulting | |||
<tr><td>BooleanWritable</td><td>bool</td></tr> | |||
<tr><td>BytesWritable</td><td>bytearray</td></tr> | |||
<tr><td>NullWritable</td><td>None</td></tr> | |||
<tr><td>ArrayWritable</td><td>list of primitives, or tuple of objects</td></tr> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this work before and get removed now, or was it a mistake in the docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mateiz we don't handle arrays currently and this is also the case for Scala API. The reason is ArrayWritable class doesn't have a no-arg constructor for creating an empty instance upon reading. User needs to create subtypes. Although we could add subtypes for handling primitive arrays, that makes Spark a dependency for users, which we probably don't want to do.
For conversion between arrays and ArrayWritable subtypes, when reading we can convert automatically as long as the subtype is on the class path. However, when writing we can't convert arrays to ArrayWritable subtypes automatically since we don't know which subtype to use. User needs to specify custom converters.
We should look into ArrayPrimitiveWritable, which is not available in Hadoop v1.0.4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, it looks like in Scala we can write them but not read them. It's probably fine to remove them from the table then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't write arrays in Scala either (the implicit conversion from Array to ArrayWritable is marked private). Otherwise, it can be awkward as we can't read it back since ArrayWritable doesn't have a no-arg constructor. For user supplied ArrayWritable subtypes, we can read them, it's just they won't be implicitly converted. Essentially the same support as we have in Python.
This looks awesome, thanks for putting it together! One comment I have though is that we should add more test coverage, to make sure we cover all the data types supported. Instead of doing this in doc comments, which gets unwieldy, you can do it in python/pyspark/tests.py, which is a standalone test file. Just make sure we have tests that cover each supported data type in sequence files. @MLnick you should look at this too when you have a chance. |
I have had a quick look over and will try to do a more detailed one this weekend. High level looks good, 2 comments so far:
Will provide any more feedback as I go through it in more detail. (btw thanks for fixing up the |
QA results for PR 1338: |
Great - I will review in more detail after that. Would be great to get this On Tue, Jul 15, 2014 at 1:07 AM, kanzhang [email protected] wrote:
|
@MLnick I'm thinking of removing the tests and programming guide entry for custom classes (JavaBeans). It seems to be a feature of Pyrolite and I can't think of any obvious use of it in the context of RDDs. For example, Pyrolite maps a JavaBean to a dict of its attributes in Python, but one can't go reverse. Listing it as a supported data type may add confusion to users. Thoughts? |
Regarding the JavaBeans, is there a reason to believe Pyrolite won't support them in the future? Or are you just suggesting to remove it because we can't also save data? That would be a bit of a regression for the reading side, though maybe InputFormats that return JavaBeans are not that common. |
@kanzhang @mateiz Yeah this is one issue with Pyrolite vs MsgPack. MsgPack supported case classes out the box, which would likely be a bit more common that beans. I'd say that custom serde via Thinking about it some more, I would be ok to remove from the docs. This would still be available as undocumented functionality so if relevant use cases did come up on the mailing list, we could point to it and in the unlikely case that there was demand we could simply document it as read-only functionality. Bearing in mind this is also still marked experimental and we'll need to see how users use it in the wild a bit and make any amendments as required. |
@MLnick I merely removed it from programming guide. The functionality (and your test) is still there should anyone wants to try it. @mateiz I meant when reading JavaBeans, you get a dict of attributes to values on the Python side. But you can't turn around and save it as JavaBeans from Python. What you save is a Java Map since that's what Pyrolite will pickle a dict to. I was trying to confirm the same asymmetry on the saving side (i.e., saving a Python custom object as Java Map, and reads it back as a Python dict), but I got the following exception and gave up.
|
QA tests have started for PR 1338. This patch merges cleanly. |
QA results for PR 1338: |
Major changes for the updated patch.
|
@pwendell I renamed file HBaseConverter.scala to HBaseConverters.scala. Now I failed Scala style checks. How can I fix it? Thx. |
The style check error is different, see https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16994/consoleFull. It's a bit hidden in there but it says:
|
@@ -31,13 +31,14 @@ import org.apache.spark.annotation.Experimental | |||
* transformation code by overriding the convert method. | |||
*/ | |||
@Experimental | |||
trait Converter[T, U] extends Serializable { | |||
trait Converter[T, +U] extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the style checker seems to be complaining about this +, which is a mistake in the style checker. You can add a space after the + for now. But do we really need covariance here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(For better or worse, we don't really use it elsewhere in Spark)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mateiz thanks, Matei. I saw it but I couldn't believe that was the reason :-). I added the + sign because some of our converters have more specific types like [Any, Writable] and the compiler complains when assigning them to where [Any, Any] is required. I don't have a strong preference here and could change them back to [Any, Any]. Let me know.
QA tests have started for PR 1338. This patch merges cleanly. |
QA results for PR 1338: |
Now I have got the following error, since
|
QA tests have started for PR 1338. This patch merges cleanly. |
Nevermind. I'm refactoring. |
QA results for PR 1338: |
QA tests have started for PR 1338. This patch merges cleanly. |
QA results for PR 1338: |
QA tests have started for PR 1338. This patch merges cleanly. |
QA tests have started for PR 1338. This patch merges cleanly. |
QA results for PR 1338: |
QA results for PR 1338: |
I think we should remove the |
} | ||
pyRDD.mapPartitions { iter => | ||
val unpickle = new Unpickler | ||
val unpickled = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This batchSerialized
-respecting unpickling logic should probably live in its own function so that it can also be used by pythonToJavaMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we defer this refactoring to when we update pythonToJavaMap
, since I don't want to touch SchemaRDD code in this patch?
Problem with that is currently PythonRDD objects are only created by PipelinedRDD, whereas in other cases (e.g., |
Ah, I see. I don't mind deferring that refactoring to a later patch. I'll create some PySpark refactoring JIRAs later. |
I've merged this. Thanks! |
JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024 This PR is a followup to apache#455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats. * Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and ```saveAsHadoopDataset```, for both old and new MapReduce APIs. * Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types. * No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to ```Object[]``` and they get pickled to Python tuples. * Added HBase and Cassandra output examples to show how custom output formats and converters can be used. cc MLnick mateiz ahirreddy pwendell Author: Kan Zhang <[email protected]> Closes apache#1338 from kanzhang/SPARK-2024 and squashes the following commits: c01e3ef [Kan Zhang] [SPARK-2024] code formatting 6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10 57a7a5e [Kan Zhang] [SPARK-2024] correcting typo 75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD 0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests 9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests 0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched cases 7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark
JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024
This PR is a followup to #455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats.
saveAsSequenceFile
,saveAsHadoopFile
andsaveAsHadoopDataset
, for both old and new MapReduce APIs.Object[]
and they get pickled to Python tuples.cc @MLnick @mateiz @ahirreddy @pwendell