Skip to content

Commit

Permalink
Simplify code.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Feb 26, 2015
1 parent cc1d472 commit 143927a
Showing 1 changed file with 15 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val table = synchronized {
client.getTable(in.database, in.name)
}
val schemaString = Option(table.getProperty("spark.sql.sources.schema"))
.orElse {
// If spark.sql.sources.schema is not defined, we either splitted the schema to multiple
// parts or the schema was not defined. To determine if the schema was defined,
// we check spark.sql.sources.schema.numOfParts.
Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match {
case Some(numOfParts) =>
val parts = (0 until numOfParts.toInt).map { index =>
Option(table.getProperty(s"spark.sql.sources.schema.part.${index}"))
.getOrElse("Could not read schema from the metastore because it is corrupted.")
}
// Stick all parts back to a single schema string in the JSON representation.
Some(parts.mkString)
case None => None // The schema was not defined.
val schemaString = Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match {
case Some(numOfParts) =>
val parts = (0 until numOfParts.toInt).map { index =>
Option(table.getProperty(s"spark.sql.sources.schema.part.${index}"))
.getOrElse("Could not read schema from the metastore because it is corrupted.")
}
}
// Stick all parts back to a single schema string in the JSON representation.
Some(parts.mkString)
case None => None // The schema was not defined.
}

val userSpecifiedSchema =
schemaString.flatMap(s => Some(DataType.fromJson(s).asInstanceOf[StructType]))
Expand Down Expand Up @@ -133,23 +127,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
if (userSpecifiedSchema.isDefined) {
val threshold = hive.conf.schemaStringLengthThreshold
val schemaJsonString = userSpecifiedSchema.get.json
// Check if the size of the JSON string of the schema exceeds the threshold.
if (schemaJsonString.size > threshold) {
// Need to split the string.
val parts = schemaJsonString.grouped(threshold).toSeq
// First, record the total number of parts we have.
tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString)
// Second, write every part to table property.
parts.zipWithIndex.foreach {
case (part, index) =>
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
}
} else {
// The length is less than the threshold, just put it in the table property.
tbl.setProperty("spark.sql.sources.schema.numOfParts", "1")
// We use spark.sql.sources.schema instead of using spark.sql.sources.schema.part.0
// because users may have already created data source tables in metastore.
tbl.setProperty("spark.sql.sources.schema", schemaJsonString)
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString)
parts.zipWithIndex.foreach {
case (part, index) =>
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
}
}
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
Expand Down

0 comments on commit 143927a

Please sign in to comment.