Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2759][CORE] Generic Binary File Support in Spark #1658

Closed
wants to merge 30 commits into from

Conversation

kmader
Copy link
Contributor

@kmader kmader commented Jul 30, 2014

The additions add the abstract BinaryFileInputFormat and BinaryRecordReader classes for reading in data as a byte stream and converting it to another format using the def parseByteArray(inArray: Array[Byte]): T function.
As a trivial example ByteInputFormat and ByteRecordReader are included which just return the Array[Byte] from a given file.
Finally a RDD for BinaryFileInputFormat (to allow for easier partitioning changes as was done for WholeFileInput) was added and the appropriate byteFiles to the SparkContext so the functions can be easily used by others.
A common use case might be to read in a folder

sc.byteFiles("s3://mydrive/tif/*.tif").map(rawData => ReadTiffFromByteArray(rawData))

@mateiz
Copy link
Contributor

mateiz commented Jul 30, 2014

Do you mind opening a JIRA issue on https://issues.apache.org/jira/browse/SPARK to track this?

Also, I wonder if we should make the API just return an RDD of InputStreams. That way users can read directly from a stream and don't need to load the whole file in memory into a byte array. The only awkward thing is that calling cache() on an RDD of InputStreams wouldn't work, but hopefully this is obvious (and will be documented). Or if that doesn't sound good, we could return some objects that let you open a stream repeatedly (some kind of BinaryFile object with a stream method).

@mateiz
Copy link
Contributor

mateiz commented Jul 30, 2014

Jenkins, this is ok to test

*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*/
def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call this binaryFiles.

Also, please add it to JavaSparkContext, and ideally we'd have a way to add it to Python as well. That one will be trickier -- we probably need to read the file in chunks and pass them to Python. But I think it's important to design the API as part of this change.

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17505/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1658:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
@serializable abstract class BinaryRecordReader[T](
@serializable class ByteInputFormat extends BinaryFileInputFormat[Array[Byte]] {
@serializable class ByteRecordReader(

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17505/consoleFull

…ion for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api
@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17517/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA results for PR 1658:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class StreamBasedRecordReader[T](
class StreamRecordReader(
* A class for extracting the information from the file using the BinaryRecordReader (as Byte array)
class StreamInputFormat extends StreamFileInputFormat[DataInputStream] {
abstract class BinaryRecordReader[T](
class ByteRecordReader(
* A class for extracting the information from the file using the BinaryRecordReader (as Byte array)
class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17517/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17523/consoleFull

@kmader
Copy link
Contributor Author

kmader commented Jul 31, 2014

Thanks for the feedback, I have made the changes requested, created an issue (https://issues.apache.org/jira/browse/SPARK-2759), and added a dataStreamFiles to both SparkContext and JavaSparkContext which returns the DataInputStream itself (I have a feeling this might create a few more new issues with serialization or properly closing or rerunning tasks, but I guess we'll see).

My recommendation (as I have done in my own code) would be to use the abstract class StreamBasedRecordReader and implement an appropriate version for custom filetypes by implementing def parseStream(inStream: DataInputStream): T

As for PySpark it is my guess that is would be easiest to create a library of StreamBasedRecordReader classes for common file types since it is much less expensive to do IO on the Scala/Java level. Alternatively a Spark function could copy the file into a local directory on demand and provide the local filename to Python

@kmader kmader changed the title Generic Binary File Support in Spark [SPARK-2759][CORE] Generic Binary File Support in Spark Jul 31, 2014
@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA results for PR 1658:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class StreamBasedRecordReader[T](
class StreamRecordReader(
class StreamInputFormat extends StreamFileInputFormat[DataInputStream] {
abstract class BinaryRecordReader[T](
class ByteRecordReader(
* A class for reading the file using the BinaryRecordReader (as Byte array)
class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17523/consoleFull

@freeman-lab
Copy link
Contributor

@kmader @mateiz this looks really useful! I was about to submit a related PR for an InputFormat that reads and splits large flat binary files into records (of a specified length), rather than read one file per record as here. We find this is the easiest easy way for users to bring large numerical data from existing NumPy / Matlab pipelines into Spark. Here's a gist. Would these be compatible? Perhaps analogous to the text case, we could have both byteFile and wholeByteFiles?

@kmader
Copy link
Contributor Author

kmader commented Aug 13, 2014

@freeman-lab looks good, I will add it to this pull request if that's ok for you. I think my personal preference would be do keep binaryFiles for standard operations and fixedLengthBinaryFiles for other files since many standard binary formats are not so easily partition-able and trying to read in tif, jpg, even hdf5 and raw under such conditions will be rather difficult to do correctly. Where as for text files line by line is a common partitioning. Perhaps there are other use cases that I am not familiar with that speak against this though.

…and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile
@SparkQA
Copy link

SparkQA commented Aug 13, 2014

QA tests have started for PR 1658. This patch DID NOT merge cleanly!
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18439/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 13, 2014

QA results for PR 1658:
- This patch FAILED unit tests.

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18439/consoleFull

* @param path Directory to the input data files
* @return An RDD of data with values, RDD[(Array[Byte])]
*/
def fixedLengthBinaryFiles(path: String): RDD[Array[Byte]] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been taken almost directly from
https://github.com/freeman-lab/thunder/blob/master/scala/src/main/scala/thunder/util/Load.scala without the extra formatting to load it as a a list of doubles

@SparkQA
Copy link

SparkQA commented Aug 13, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18441/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 13, 2014

QA results for PR 1658:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] {
class FixedLengthBinaryRecordReader extends RecordReader[LongWritable, BytesWritable] {
abstract class StreamBasedRecordReader[T](
class StreamRecordReader(
class StreamInputFormat extends StreamFileInputFormat[DataInputStream] {
abstract class BinaryRecordReader[T](
class ByteRecordReader(
* A class for reading the file using the BinaryRecordReader (as Byte array)
class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18441/consoleFull

@mateiz
Copy link
Contributor

mateiz commented Aug 13, 2014

Hey, sorry for taking a bit of time to get back to this (I've been looking at 1.1 stuff), but I have a few comments on the API:

  • Do we need both a stream API and a byte array one? I'd personally offer only the stream one because it's less likely to cause crashes (with the other one there's a risk of OutOfMemoryError).
  • For the files vs fixed-length records, maybe we can call the methods binaryFiles and binaryRecords.
  • Are you planning to create saveAsBinaryFiles / saveAsBinaryRecords too? We don't have to have it in this PR but it would be useful.

*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Care must be taken to close the files afterwards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit unfortunate that users have to close the streams by hand. If you want to get around this, you can create a custom RDD wrapping around the HadoopRDD, whose compute() method can add a cleanup hook to its TaskContext to close the stream. Take a look at TaskContext.addOnCompleteCallback().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Kevin, is this @note still relevant? using addOnCompleteCallback you might be able to avoid this.

@SparkQA
Copy link

SparkQA commented Oct 20, 2014

QA tests have started for PR 1658 at commit 92bda0d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 20, 2014

QA tests have finished for PR 1658 at commit 92bda0d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kmader
Copy link
Contributor Author

kmader commented Oct 20, 2014

So I made the requested changes and added a few more tests, but the tests appear to have not run for a strange reason: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21922/console, the build runs out of memory with Maven but works fine in IntelliJ but I do not get any feedback on the style is there any single maven phase I can run to get that?

@mateiz
Copy link
Contributor

mateiz commented Oct 21, 2014

There might've been some Jenkins issues recently; going to restart it.

@mateiz
Copy link
Contributor

mateiz commented Oct 21, 2014

BTW for the style, you can do "sbt/sbt scalastyle" locally if you want. Not sure there's a command in Maven.

@mateiz
Copy link
Contributor

mateiz commented Oct 21, 2014

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Oct 21, 2014

QA tests have started for PR 1658 at commit 92bda0d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 21, 2014

QA tests have finished for PR 1658 at commit 92bda0d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 21, 2014

QA tests have started for PR 1658 at commit 8ac288b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 21, 2014

QA tests have finished for PR 1658 at commit 8ac288b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,

@SparkQA
Copy link

SparkQA commented Oct 21, 2014

QA tests have started for PR 1658 at commit 6379be4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 21, 2014

QA tests have finished for PR 1658 at commit 6379be4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,

@mateiz
Copy link
Contributor

mateiz commented Oct 28, 2014

Thanks for the update, Kevin. Note that there are still a few comments from me on https://github.com/apache/spark/pull/1658/files, do you mind dealing with those?

@mateiz
Copy link
Contributor

mateiz commented Oct 29, 2014

@kmader btw if you don't have time to deal with these comments, let me know; I might be able to take the patch from where it is and implement them.

@mateiz
Copy link
Contributor

mateiz commented Oct 30, 2014

Thanks for the update, Kevin. Looks like Jenkins had some issues with git, will retry it.

@mateiz
Copy link
Contributor

mateiz commented Oct 30, 2014

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Oct 30, 2014

Test build #22503 has finished for PR 1658 at commit 3c49a30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 30, 2014

Test build #22505 has finished for PR 1658 at commit 3c49a30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,

@asfgit asfgit closed this in 7136719 Nov 1, 2014
@mateiz
Copy link
Contributor

mateiz commented Nov 1, 2014

Thanks @kmader, I merged this now. I manually amended the patch a bit to fix style issues (there were still a bunch of commas without spaces, etc), and I also changed the name of the recordLength property in Hadoop JobConfs to start with org.apache.spark so that it's less likely to clash with other Hadoop properties. Finally I marked this API as @Experimental for now since it's new in this release, though we can probably make it non-experimental in 1.3.

classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
val data = br.map{ case (k, v) => v.getBytes}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that getBytes returns a padded byte array, so I think you may need to copy / slice out the subarray with the data using v.getLength; see HADOOP-6298: "BytesWritable#getBytes is a bad name that leads to programming mistakes" for more details.

Using getBytes without getLength has caused bugs in Spark in the past: #2712.

Is the use of getBytes in this patch a bug? Or is it somehow safe due to our use of FixedLengthBinaryInputFormat? If it is somehow safe, we should have a comment which explains this so that readers who know about the getBytes issue aren't confused (or better yet, an assert that getBytes returns an array of the expected length).

sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants