diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index f38ed55f5c417..0eef1cd6965ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -20,12 +20,12 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI import java.util.Locale import java.util.concurrent.Callable -import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import scala.util.{Failure, Success, Try} import com.google.common.cache.{Cache, CacheBuilder} +import javax.annotation.concurrent.GuardedBy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StructField, StructType} + object SessionCatalog { val DEFAULT_DATABASE = "default" } @@ -60,8 +61,8 @@ class SessionCatalog( hadoopConf: Configuration, parser: ParserInterface, functionResourceLoader: FunctionResourceLoader) extends Logging { - import SessionCatalog._ import CatalogTypes.TablePartitionSpec + import SessionCatalog._ // For testing only. def this( @@ -666,7 +667,28 @@ class SessionCatalog( } else if (name.database.isDefined || !tempTables.contains(table)) { val tableNamePreprocessor = externalCatalog.getTableNamePreprocessor val tableNameInMetastore = tableNamePreprocessor(table) - val metadata = externalCatalog.getTable(db, tableNameInMetastore).withTableName(table) + + // if the table name is of CSD's proprietary form, we remove version partitioning + // information so that our custom hdfs file/dir selector path can be triggered. + // this is to help CSD to transition to partition version scheme and maintain + // backward compatibility for old queries + val metadataLookup = externalCatalog.getTable(db, tableNameInMetastore).withTableName(table) + val metadata = if (!tableNameInMetastore.equalsIgnoreCase(table) && + metadataLookup.partitionColumnNames.exists(_.equalsIgnoreCase("version"))) { + metadataLookup.copy( + partitionColumnNames = metadataLookup.partitionColumnNames.filter{ s => + !s.equalsIgnoreCase("version") + }, + schema = metadataLookup.schema.copy( + fields = metadataLookup.schema.fields.filter { s => + !s.name.equalsIgnoreCase("version") + } + ) + ) + } else { + metadataLookup + } + if (metadata.tableType == CatalogTableType.VIEW) { val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text.")) // The relation is a view, so we wrap the relation by: