-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2759][CORE] Generic Binary File Support in Spark #1658
Conversation
Updating to the lastest spark repository
Do you mind opening a JIRA issue on https://issues.apache.org/jira/browse/SPARK to track this? Also, I wonder if we should make the API just return an RDD of InputStreams. That way users can read directly from a stream and don't need to load the whole file in memory into a byte array. The only awkward thing is that calling cache() on an RDD of InputStreams wouldn't work, but hopefully this is obvious (and will be documented). Or if that doesn't sound good, we could return some objects that let you open a stream repeatedly (some kind of BinaryFile object with a stream method). |
Jenkins, this is ok to test |
* | ||
* @note Small files are preferred, large file is also allowable, but may cause bad performance. | ||
*/ | ||
def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd call this binaryFiles
.
Also, please add it to JavaSparkContext
, and ideally we'd have a way to add it to Python as well. That one will be trickier -- we probably need to read the file in chunks and pass them to Python. But I think it's important to design the API as part of this change.
QA tests have started for PR 1658. This patch merges cleanly. |
QA results for PR 1658: |
…ion for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api
QA tests have started for PR 1658. This patch merges cleanly. |
QA results for PR 1658: |
QA tests have started for PR 1658. This patch merges cleanly. |
Thanks for the feedback, I have made the changes requested, created an issue (https://issues.apache.org/jira/browse/SPARK-2759), and added a dataStreamFiles to both SparkContext and JavaSparkContext which returns the DataInputStream itself (I have a feeling this might create a few more new issues with serialization or properly closing or rerunning tasks, but I guess we'll see). My recommendation (as I have done in my own code) would be to use the abstract class As for PySpark it is my guess that is would be easiest to create a library of StreamBasedRecordReader classes for common file types since it is much less expensive to do IO on the Scala/Java level. Alternatively a Spark function could copy the file into a local directory on demand and provide the local filename to Python |
QA results for PR 1658: |
@kmader @mateiz this looks really useful! I was about to submit a related PR for an InputFormat that reads and splits large flat binary files into records (of a specified length), rather than read one file per record as here. We find this is the easiest easy way for users to bring large numerical data from existing NumPy / Matlab pipelines into Spark. Here's a gist. Would these be compatible? Perhaps analogous to the text case, we could have both byteFile and wholeByteFiles? |
@freeman-lab looks good, I will add it to this pull request if that's ok for you. I think my personal preference would be do keep binaryFiles for standard operations and fixedLengthBinaryFiles for other files since many standard binary formats are not so easily partition-able and trying to read in tif, jpg, even hdf5 and raw under such conditions will be rather difficult to do correctly. Where as for text files line by line is a common partitioning. Perhaps there are other use cases that I am not familiar with that speak against this though. |
…and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile
QA tests have started for PR 1658. This patch DID NOT merge cleanly! |
QA results for PR 1658: |
* @param path Directory to the input data files | ||
* @return An RDD of data with values, RDD[(Array[Byte])] | ||
*/ | ||
def fixedLengthBinaryFiles(path: String): RDD[Array[Byte]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been taken almost directly from
https://github.com/freeman-lab/thunder/blob/master/scala/src/main/scala/thunder/util/Load.scala without the extra formatting to load it as a a list of doubles
QA tests have started for PR 1658. This patch merges cleanly. |
QA results for PR 1658: |
Hey, sorry for taking a bit of time to get back to this (I've been looking at 1.1 stuff), but I have a few comments on the API:
|
* | ||
* @param minPartitions A suggestion value of the minimal splitting number for input data. | ||
* | ||
* @note Care must be taken to close the files afterwards |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit unfortunate that users have to close the streams by hand. If you want to get around this, you can create a custom RDD wrapping around the HadoopRDD, whose compute() method can add a cleanup hook to its TaskContext to close the stream. Take a look at TaskContext.addOnCompleteCallback().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Kevin, is this @note
still relevant? using addOnCompleteCallback you might be able to avoid this.
…s, formatted code more nicely
QA tests have started for PR 1658 at commit
|
QA tests have finished for PR 1658 at commit
|
So I made the requested changes and added a few more tests, but the tests appear to have not run for a strange reason: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21922/console, the build runs out of memory with Maven but works fine in IntelliJ but I do not get any feedback on the style is there any single maven phase I can run to get that? |
There might've been some Jenkins issues recently; going to restart it. |
BTW for the style, you can do "sbt/sbt scalastyle" locally if you want. Not sure there's a command in Maven. |
Jenkins, retest this please |
QA tests have started for PR 1658 at commit
|
QA tests have finished for PR 1658 at commit
|
QA tests have started for PR 1658 at commit
|
QA tests have finished for PR 1658 at commit
|
QA tests have started for PR 1658 at commit
|
QA tests have finished for PR 1658 at commit
|
Thanks for the update, Kevin. Note that there are still a few comments from me on https://github.com/apache/spark/pull/1658/files, do you mind dealing with those? |
@kmader btw if you don't have time to deal with these comments, let me know; I might be able to take the patch from where it is and implement them. |
…functions to make their usage clearer
…n as in BinaryData files
Thanks for the update, Kevin. Looks like Jenkins had some issues with git, will retry it. |
Jenkins, retest this please |
Test build #22503 has finished for PR 1658 at commit
|
Test build #22505 has finished for PR 1658 at commit
|
Thanks @kmader, I merged this now. I manually amended the patch a bit to fix style issues (there were still a bunch of commas without spaces, etc), and I also changed the name of the recordLength property in Hadoop JobConfs to start with org.apache.spark so that it's less likely to clash with other Hadoop properties. Finally I marked this API as |
classOf[LongWritable], | ||
classOf[BytesWritable], | ||
conf=conf) | ||
val data = br.map{ case (k, v) => v.getBytes} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out that getBytes
returns a padded byte array, so I think you may need to copy / slice out the subarray with the data using v.getLength
; see HADOOP-6298: "BytesWritable#getBytes is a bad name that leads to programming mistakes" for more details.
Using getBytes
without getLength
has caused bugs in Spark in the past: #2712.
Is the use of getBytes
in this patch a bug? Or is it somehow safe due to our use of FixedLengthBinaryInputFormat? If it is somehow safe, we should have a comment which explains this so that readers who know about the getBytes
issue aren't confused (or better yet, an assert
that getBytes
returns an array of the expected length).
The additions add the abstract BinaryFileInputFormat and BinaryRecordReader classes for reading in data as a byte stream and converting it to another format using the
def parseByteArray(inArray: Array[Byte]): T
function.As a trivial example
ByteInputFormat
andByteRecordReader
are included which just return the Array[Byte] from a given file.Finally a RDD for
BinaryFileInputFormat
(to allow for easier partitioning changes as was done for WholeFileInput) was added and the appropriate byteFiles to theSparkContext
so the functions can be easily used by others.A common use case might be to read in a folder