Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Jan 28, 2017
1 parent 1b5ee20 commit a398036
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -839,26 +839,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
}


/**
* The partition path created by Hive is in lowercase, while Spark SQL will
* rename it with the partition name in partitionColumnNames, and this function
* returns the extra lowercase path created by Hive, and then we can delete it.
* e.g. /path/A=1/B=2/C=3 is changed to /path/A=4/B=5/C=6, this function returns
* /path/a=4
*/
def getExtraPartPathCreatedByHive(
spec: TablePartitionSpec,
partitionColumnNames: Seq[String],
tablePath: Path): Path = {
val partColumnNames = partitionColumnNames
.take(partitionColumnNames.indexWhere(col => col.toLowerCase != col) + 1)
.map(_.toLowerCase)

ExternalCatalogUtils.generatePartitionPath(lowerCasePartitionSpec(spec),
partColumnNames, tablePath)
}

override def createPartitions(
db: String,
table: String,
Expand Down Expand Up @@ -919,21 +899,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
spec, partitionColumnNames, tablePath)
try {
tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)

// If the newSpec contains more than one depth partition, FileSystem.rename just deletes
// the leaf(i.e. wrongPath), we should check if wrongPath's parents need to be deleted.
// For example, give a newSpec 'A=1/B=2', after calling Hive's client.renamePartitions,
// the location path in FileSystem is changed to 'a=1/b=2', which is wrongPath, then
// although we renamed it to 'A=1/B=2', 'a=1/b=2' in FileSystem is deleted, but 'a=1'
// is still exists, which we also need to delete
val delHivePartPathAfterRename = getExtraPartPathCreatedByHive(
spec,
partitionColumnNames,
tablePath)

if (delHivePartPathAfterRename != wrongPath) {
tablePath.getFileSystem(hadoopConf).delete(delHivePartPathAfterRename, true)
}
} catch {
case e: IOException => throw new SparkException(
s"Unable to rename partition path from $wrongPath to $rightPath", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package org.apache.spark.sql.hive

import java.io.File

import org.apache.hadoop.fs.Path

import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -484,37 +481,4 @@ class PartitionProviderCompatibilitySuite
assert(spark.sql("show partitions test").count() == 5)
}
}

test("partition path created by Hive should be deleted after renamePartitions with upper-case") {
withTable("t", "t1", "t2") {
Seq((1, 2, 3)).toDF("id", "A", "B").write.partitionBy("A", "B").saveAsTable("t")
spark.sql("alter table t partition(A=2, B=3) rename to partition(A=4, B=5)")

var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
var extraHivePath = new Path(table.location + "/a=4")
assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
.exists(extraHivePath), "partition path created by Hive should be deleted " +
"after renamePartitions with upper-case")

Seq((1, 2, 3, 4)).toDF("id", "A", "B", "C").write.partitionBy("A", "B", "C").saveAsTable("t1")
spark.sql("alter table t1 partition(A=2, B=3, C=4) rename to partition(A=5, B=6, C=7)")
table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
extraHivePath = new Path(table.location + "/a=5")
assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
.exists(extraHivePath), "partition path created by Hive should be deleted " +
"after renamePartitions with upper-case")

Seq((1, 2, 3, 4)).toDF("id", "a", "B", "C").write.partitionBy("a", "B", "C").saveAsTable("t2")
spark.sql("alter table t2 partition(a=2, B=3, C=4) rename to partition(a=4, B=5, C=6)")
table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
val partPath = new Path(table.location + "/a=4")
assert(partPath.getFileSystem(spark.sessionState.newHadoopConf())
.exists(partPath), "partition path of lower-case partition name should not be deleted")

extraHivePath = new Path(table.location + "/a=4/b=5")
assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
.exists(extraHivePath), "partition path created by Hive should be deleted " +
"after renamePartitions with upper-case")
}
}
}

0 comments on commit a398036

Please sign in to comment.