Skip to content

Commit

Permalink
Assembles requested schema from Parquet file schema
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jul 8, 2015
1 parent bcac49f commit 6437d4b
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,20 @@ private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logg
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
val maybeRequestedSchema = Option(conf.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA))

val parquetSchema = maybeRequestedSchema.map { schemaString =>
val toParquet = new CatalystSchemaConverter(conf)
toParquet.convert(StructType.fromString(schemaString))
}.getOrElse(context.getFileSchema)
val parquetRequestedSchema =
maybeRequestedSchema.map { schemaString =>
StructType.fromString(schemaString).map { field =>
val fieldType = context.getFileSchema.asGroupType().getType(field.name)
new MessageType("root", fieldType)
}.reduce(_ union _)
}.getOrElse(context.getFileSchema)

val metadata =
Map.empty[String, String] ++
maybeRequestedSchema.map(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)

new ReadContext(parquetSchema, metadata)
new ReadContext(parquetRequestedSchema, metadata)
}
}

Expand Down

0 comments on commit 6437d4b

Please sign in to comment.