-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43039][SQL] Support custom fields in the file source _metadata column. #40677
[SPARK-43039][SQL] Support custom fields in the file source _metadata column. #40677
Conversation
79c83f7
to
6d35127
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
Show resolved
Hide resolved
@@ -264,9 +261,11 @@ object FileFormat { | |||
fileSize: Long, | |||
fileBlockStart: Long, | |||
fileBlockLength: Long, | |||
fileModificationTime: Long): InternalRow = { | |||
fileModificationTime: Long, | |||
otherConstantMetadataColumnValues: Map[String, Any]): InternalRow = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is otherConstantMetadataColumnValues
generated? FileFormat
doesn't have a API for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the unit tests -- the FileIndex.listFiles
is responsible to provide it as part of the PartitionDirectory
it creates for each file.
case _: LongType | _: IntegerType | _: ShortType | _: ByteType => true | ||
case _: DoubleType | _: FloatType => true | ||
case _: StringType => true | ||
case _: TimestampType => true // really just Long |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add DateType
(int), DayTimeIntervalType
(long) and YearMonthIntervalType
(int)`?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixed by using Literal
, PhysicalType
, and ColumnVectorUtils.populate
, which also has the nice side effect of simplifying the code.
*/ | ||
def createFileMetadataCol(): AttributeReference = { | ||
// Strip out the fields' metadata to avoid exposing it to the user. [[FileSourceStrategy]] | ||
// avoids confusion by mapping back to [[metadataSchemaFields]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit .. but in these regular comments, we could just use backticks. [[...]]
is the syntax for Scaladoc (not for the comments).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally find the brackets more readable (and my editor likes them better than backticks as well).
Is there a rule against using them in normal comments?
// Other metadata columns use the file-provided value (if any). As a courtesy, convert any | ||
// normal strings to the required [[UTF8String]]. | ||
// | ||
// TODO(frj): Do we need to potentially support value-producing functions? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we file a JIRA and fix it like TODO(SPARK-XXXXX)
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing for now, because the TODO doesn't add any meaningful information.
If/when the need arises, it will be obvious enough that this code needs to change.
* custom file-constant metadata columns, but in general tasks and readers can use the per-file | ||
* metadata however they see fit. | ||
*/ | ||
case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's think more about the API design. I think it's too fragile to use Any
in the API, without a well-defined rule for what the actually allowed values are.
I'd suggest using Map[String, Literal]
. Then we can remove def isSupportedType
as all types can be supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my TODO above... we may need to consider supporting value-producing functions, to allow full pruning in cases where the value is somehow expensive to compute. Requiring Literal
would block that (and AFAIK only Any
could capture both Literal
and () => Literal
).
The FILE_PATH
case that calls Path.toString
, and the call sites of PartitionedFile is a small example of that possibility that got me thinking -- what if instead of passing length, path, etc as arguments, we just passed the actual file status, and used the extractors on it? Probably doesn't make sense to actually do that for the hard-wired cases, tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do like the idea of supporting Literal
as one of the supported cases -- it simplifies the type checking a bit, in that the "supported" primitive types are merely those for which the implementation will automatically create the Literal
wrapper as a courtesy (similar to string vs. UTF8String).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I remember now another reason why I had added isSupportedDataType
-- ConstantColumnVector
(needed by FileScanRDD...createMetadataColumnVector below) supports a limited subset of types, and relies on type-specific getters and setters. Even if I wrote the (complex recursive) code to handle structs, maps, and arrays... we still wouldn't have complete coverage for all types.
Do we know for certain that ConstantColumnVector
supports all types that can ever be encountered during vectorized execution? If not, we must keep the isSupportedDataType
method I introduced, regardless of whether we choose to add support for metadata fields with complex types in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: ConstantColumnVector
looks like an incompletely implemented API... it "supports" array/map/struct on the surface (e.g. ConstantColumnVectorSuite
has superficial tests for it), but e.g. ColumnVectorUtils.populate
doesn't actually handle them and ColumnVectorUtilsSuite.scala
has negative tests to verify that they cannot be used in practice.
As far as I can tell, the class really only supports data types that can be used as partition columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the doc comment here to explain that file-source metadata fields is only one possible usage for the extra file metadata (which is conceptually at a deeper layer than catalyst and Literal
).
Also updated isSupportedType
doc comment to explain why not all types are supported.
Relevant implementation details:
- It would take a lot of work to support all data types, regardless of whether we use
Literal
vs.Any
. - We anyway end up wrapping the provided value in a call to
Literal(_)
, because doing so simplifies null handling by making null-because-missing equivalent to null-because-null. At that point, we get wrapping of primitive values "for free" if we happen to passAny
instead.
6d35127
to
51359c1
Compare
case PhysicalNullType => true | ||
case PhysicalBooleanType => true | ||
case PhysicalByteType | PhysicalShortType | PhysicalIntegerType | PhysicalLongType => true | ||
case PhysicalFloatType | PhysicalDoubleType => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case _: PhysicalPrimitiveType => true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... it's currently true that ColumnVectorUtils.populate
supports all physical primitive types, but somebody neglected to make the latter a sealed trait. I'll fix that and simplify the match here.
* NOTE: It is not possible to change the semantics of the base metadata fields by overriding this | ||
* method. Technically, a file format could choose suppress them, but that is not recommended. | ||
*/ | ||
def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should have 2 APIs:
def constantMetadataClolumns: Seq[StructField]
def generatedMetadataColumns: Seq[StructField]
Then Spark can add metadata fields which means less work for the implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it more, how can a file source define custom constant metadata columns? The file listing logic is shared for all file sources and I can't think of a way to customize it for certain file sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs a custom FileIndex
to go with the FileFormat
(see the unit test for an example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. How about my first comment? Or do we expect the implementations to properly separate constant and generated metadata columns by using those util objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate what we gain by splitting out two lists? For generated columns, in particular, we must use the helper object because the user should specify the physical column name to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to make it easier for third-party file sources to implement the new functions. The fewer internal details we expose through API, the more API stability we have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we don't have a choice here. The implementation needs to specify the physical column name, and we must expose these details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least the surface is pretty minimal (nobody needs to know the specific metadata tags that get used): Instead of saying
StructField(name, dataType, nullable)
they pick one of:
FileSourceConstantMetadataStructField(name, dataType, nullable)
FileSourceGeneratedMetadataStructField(name, internalName, dataType, nullable)
51359c1
to
0612742
Compare
thanks, merging to master! |
…sting DV Information (#2888) <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Back then, we relied on an [expensive Broadcast of DV files](#1542) to pass the DV files to the associated Parquet Files. With the introduction of [adding custom metadata to files](apache/spark#40677) introduced in Spark 3.5, we can now pass the DV through the custom metadata field, this is expected to improve the performance of DV reads in Delta. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Adjusted the existing UTs that cover our changes. ## Does this PR introduce _any_ user-facing changes? No. <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
What changes were proposed in this pull request?
Allow
FileFormat
instances to define the schema of the_metadata
column they expose.Why are the changes needed?
Today, the schema of the file source
_metadata
column depends on the file format (e.g. parquet file format supports_metadata.row_index
) but this is hard-wired into theFileFormat
itself. Not only is this an ugly design, it also prevents custom file formats from adding their own fields to the_metadata
column.Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit tests.