Skip to content

Commit

Permalink
#16 KE-31895 replace specific nameservice hacluster
Browse files Browse the repository at this point in the history
  • Loading branch information
7mming7 committed Jul 18, 2024
1 parent 937311a commit d5b7a99
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionI
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, FSNamespaceUtils, StringUtils}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -524,7 +524,9 @@ class SessionCatalog(
@throws[NoSuchTableException]
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val t = getTableRawMetadata(name)
t.copy(schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(t.schema))
val l = conf.getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION)
t.copy(schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(t.schema),
storage = FSNamespaceUtils.replaceLocWithSpecPrefix(l, t.storage))
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.util

import java.net.URI

import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat

object FSNamespaceUtils {

def replaceLocWithSpecPrefix(
specificLocation: String,
storage: CatalogStorageFormat): CatalogStorageFormat = {
val uri = if (specificLocation != null && storage.locationUri.isDefined) {
val path = storage.locationUri.get
Option(new URI(path.toString.replaceAll("hdfs://hacluster", specificLocation)))
} else storage.locationUri
CatalogStorageFormat(uri,
storage.inputFormat,
storage.outputFormat,
storage.serde,
storage.compressed,
storage.properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4388,6 +4388,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val HIVE_SPECIFIC_FS_LOCATION =
buildConf("spark.sql.hive.specific.fs.location")
.stringConf
.createWithDefault(null)

val VIEW_TRUNCATE_ENABLE =
buildConf("spark.sql.view-truncate-enabled")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -5132,6 +5142,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
override def setOpsPrecedenceEnforced: Boolean =
getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)

def isViewTruncateEnable: Boolean =
getConf(SQLConf.VIEW_TRUNCATE_ENABLE)

override def exponentLiteralAsDecimalEnabled: Boolean =
getConf(SQLConf.LEGACY_EXPONENT_LITERAL_AS_DECIMAL_ENABLED)

Expand Down Expand Up @@ -5199,6 +5212,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def readSideCharPadding: Boolean = getConf(SQLConf.READ_SIDE_CHAR_PADDING)

def specificHiveFsLocation: String = getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION)

def cliPrintHeader: Boolean = getConf(SQLConf.CLI_PRINT_HEADER)

def legacyIntervalEnabled: Boolean = getConf(LEGACY_INTERVAL_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, FSNamespaceUtils}
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{AnsiIntervalType, ArrayType, DataType, MapType, StructType, TimestampNTZType}

Expand Down Expand Up @@ -1278,9 +1278,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val catalogTable = getTable(db, table)
val partColNameMap = buildLowerCasePartColNameMap(catalogTable)
val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec)
val l = conf.get(SQLConf.HIVE_SPECIFIC_FS_LOCATION)
val res = client.getPartitions(db, table, metaStoreSpec)
.map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
}
.map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap),
storage = FSNamespaceUtils.replaceLocWithSpecPrefix(l, part.storage))
}

val parts = metaStoreSpec match {
// This might be a bug of Hive: When the partition value inside the partial partition spec
Expand All @@ -1302,9 +1304,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val rawHiveTable = client.getRawHiveTable(db, table)
val catalogTable = restoreTableMetadata(rawHiveTable.toCatalogTable)
val partColNameMap = buildLowerCasePartColNameMap(catalogTable)
val specFS = conf.get(SQLConf.HIVE_SPECIFIC_FS_LOCATION)
val clientPrunedPartitions =
client.getPartitionsByFilter(rawHiveTable, predicates).map { part =>
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap),
storage = FSNamespaceUtils.replaceLocWithSpecPrefix(specFS, part.storage))
restorePartitionMetadata(part, catalogTable)
}
prunePartitionsByFilter(catalogTable, clientPrunedPartitions, predicates, defaultTimeZoneId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.FSNamespaceUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -130,26 +131,34 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
def convert(relation: HiveTableRelation, isWrite: Boolean): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

val specFS = sessionState.conf.getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION)
val specCatalog = FSNamespaceUtils.replaceLocWithSpecPrefix(specFS, relation.tableMeta.storage)
val specTableMeta = relation.tableMeta.copy(storage = specCatalog)
val specRelation = if (specFS != null && specCatalog.locationUri.isDefined) {
relation.copy(tableMeta = specTableMeta)
} else relation

// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty).toMap ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
val options = specRelation.tableMeta.properties.filterKeys(isParquetProperty).toMap ++
specRelation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet", isWrite)
convertToLogicalRelation(specRelation, options,
classOf[ParquetFileFormat], "parquet", isWrite)
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty).toMap ++
relation.tableMeta.storage.properties
val options = specRelation.tableMeta.properties.filterKeys(isOrcProperty).toMap ++
specRelation.tableMeta.storage.properties
if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
convertToLogicalRelation(
relation,
specRelation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc",
isWrite)
} else {
convertToLogicalRelation(
relation,
specRelation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.IOException
import java.util.Locale

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class HadoopTableReader(
path.toString + tails
}

val partPath = partition.getDataLocation
var partPath = partition.getDataLocation
val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size()
val pathPatternStr = getPathPatternByPath(partNum, partPath)
if (!pathPatternSet.contains(pathPatternStr)) {
Expand All @@ -201,7 +201,7 @@ class HadoopTableReader(
val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer)
.map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDescFromTableDesc(tableDesc, partition, true)
val partPath = partition.getDataLocation
var partPath = partition.getDataLocation
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
// Get partition field info
val partSpec = partDesc.getPartSpec
Expand Down

0 comments on commit d5b7a99

Please sign in to comment.