Skip to content

Commit

Permalink
Avoiding duplicate Parquet schema merging.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Feb 26, 2015
1 parent e43139f commit ef78a5a
Showing 1 changed file with 7 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -467,22 +467,13 @@ private[parquet] class FilteringParquetRowInputFormat
return splits
}

Option(globalMetaData.getKeyValueMetaData.get(RowReadSupport.SPARK_METADATA_KEY)).foreach {
schemas =>
val mergedSchema = schemas
.map(DataType.fromJson(_).asInstanceOf[StructType])
.reduce(_ merge _)
.json

val mergedMetadata = globalMetaData
.getKeyValueMetaData
.updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(mergedSchema)))

globalMetaData = new GlobalMetaData(
globalMetaData.getSchema,
mergedMetadata,
globalMetaData.getCreatedBy)
}
val metadata = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
val mergedMetadata = globalMetaData
.getKeyValueMetaData
.updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(metadata)))

globalMetaData = new GlobalMetaData(globalMetaData.getSchema,
mergedMetadata, globalMetaData.getCreatedBy)

val readContext = getReadSupport(configuration).init(
new InitContext(configuration,
Expand Down

0 comments on commit ef78a5a

Please sign in to comment.