diff --git a/conf/spark-env.sh b/conf/spark-env.sh index c269191b9ae24..9f62e3e8357ea 100755 --- a/conf/spark-env.sh +++ b/conf/spark-env.sh @@ -170,3 +170,6 @@ export SPARK_WORKER_DIR=$SPARK_HOME/tmp #UI export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djava.library.path=$SPARK_MAPR_HOME/lib" +export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Djava.library.path=$SPARK_MAPR_HOME/lib" +export SPARK_MASTER_HOST=$(hostname --fqdn) +export SPARK_MASTER_IP=$(hostname --fqdn) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4903421f9063f..9b6f9918853ec 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -300,6 +300,7 @@ private[spark] class CoarseGrainedExecutorBackend( } else { logInfo("Skip exiting executor since it's been already asked to exit before.") } + self.send(Shutdown) } private def decommissionSelf(): Unit = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index e42b9b94d4809..7d10c0b22c59b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -664,6 +664,12 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val MAPR_SPARK_EXTRACONF_SECRET_NAME = + ConfigBuilder("spark.mapr.extraconf.secret") + .doc("Name of the secret with Spark extra configurations that will be added to sparkConf") + .stringConf + .createOptional + val MAPR_CLUSTER_CONFIGMAP = ConfigBuilder("spark.mapr.cluster.configMap") .doc("Name of the mapr cluster config map") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 9fbf5cadffb27..97c1f701e6c4f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -126,6 +126,7 @@ private[spark] object Constants { val MAPR_USER_TICKET_SUBPATH = "CONTAINER_TICKET" val MAPR_USER_SECRET_MOUNT_PATH = "/tmp/usersecret" val MAPR_USER_TICKET_MOUNT_PATH = s"$MAPR_USER_SECRET_MOUNT_PATH/$MAPR_USER_TICKET_SUBPATH" + val MAPR_SPARK_EXTRA_CONFIG_MOUNT_PATH = "/opt/mapr/kubernetes/spark_secrets" val ENV_MAPR_METRICSFILE_LOCATION = "MAPR_METRICSFILE_LOCATION" val MAPR_METRICS_TICKET_SUBPATH = "maprmetricsticket" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MaprConfigFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MaprConfigFeatureStep.scala index 5ec5236ed1157..a0640d73c4051 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MaprConfigFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MaprConfigFeatureStep.scala @@ -42,6 +42,7 @@ private[spark] class MaprConfigFeatureStep(conf: KubernetesConf) applySSSDSecret(podBuilder, containerBuilder) applySSHSecret(podBuilder, containerBuilder) applyClientSecret(podBuilder, containerBuilder) + applySparkExtraConfigs(podBuilder, containerBuilder) SparkPod(podBuilder.build(), containerBuilder.build()) } @@ -130,6 +131,32 @@ private[spark] class MaprConfigFeatureStep(conf: KubernetesConf) .endVolumeMount() } + private def applySparkExtraConfigs(podBuilder: PodBuilder, containerBuilder: ContainerBuilder): Unit = { + val confSecretName = sparkConf.get(MAPR_SPARK_EXTRACONF_SECRET_NAME).get + + if (confSecretName.isEmpty) { + return + } + + val confSecretVolumeName = "spark-extraconf-secret" + + podBuilder.editOrNewSpec() + .addNewVolume() + .withName(confSecretVolumeName) + .withNewSecret() + .withSecretName(confSecretName) + .endSecret() + .endVolume() + .endSpec() + + containerBuilder + .addNewVolumeMount() + .withName(confSecretVolumeName) + .withMountPath(MAPR_SPARK_EXTRA_CONFIG_MOUNT_PATH) + .endVolumeMount() + } + + private def applyUserSecret(podBuilder: PodBuilder, containerBuilder: ContainerBuilder): Unit = { val userSecretNameConfig = sparkConf.get(MAPR_USER_SECRET) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index ad44048ce9c6f..ddf4a90be1a48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -89,7 +89,12 @@ class JDBCOptions( if (subquery.isEmpty) { throw QueryExecutionErrors.emptyOptionError(JDBC_QUERY_STRING) } else { - s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" + val runQueryAsIs = parameters.getOrElse(JDBC_USE_RAW_QUERY, "false").toBoolean + if (runQueryAsIs) { + s"${subquery}" + } else { + s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" + } } } @@ -258,6 +263,7 @@ object JDBCOptions { val JDBC_URL = newOption("url") val JDBC_TABLE_NAME = newOption("dbtable") val JDBC_QUERY_STRING = newOption("query") + val JDBC_USE_RAW_QUERY = newOption("useRawQuery") val JDBC_DRIVER_CLASS = newOption("driver") val JDBC_PARTITION_COLUMN = newOption("partitionColumn") val JDBC_LOWER_BOUND = newOption("lowerBound") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 0f1a1b6dc667b..d4dd43523046c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -23,13 +23,15 @@ import scala.math.BigDecimal.RoundingMode import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.connector.expressions.SortOrder import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.{JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES, JDBC_USE_RAW_QUERY} import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.JdbcDialects @@ -239,11 +241,23 @@ private[sql] object JDBCRelation extends Logging { * @return resolved Catalyst schema of a JDBC table */ def getSchema(resolver: Resolver, jdbcOptions: JDBCOptions): StructType = { - val tableSchema = JDBCRDD.resolveTable(jdbcOptions) - jdbcOptions.customSchema match { - case Some(customSchema) => JdbcUtils.getCustomSchema( - tableSchema, customSchema, resolver) - case None => tableSchema + val runQueryAsIs = jdbcOptions.parameters.getOrElse(JDBC_USE_RAW_QUERY, "false").toBoolean + if (runQueryAsIs) { + val customSchema = jdbcOptions.parameters.get(JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES) + val newSchema = jdbcOptions.customSchema match { + case Some(customSchema) => CatalystSqlParser.parseTableSchema(customSchema) + case None => throw new IllegalArgumentException( + s"Field $JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES is mandatory when using $JDBC_USE_RAW_QUERY") + } + logInfo(s"Option $JDBC_USE_RAW_QUERY is enabled, parsed $newSchema from the filed $JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES with value $customSchema") + newSchema + } else { + val tableSchema = JDBCRDD.resolveTable(jdbcOptions) + jdbcOptions.customSchema match { + case Some(customSchema) => JdbcUtils.getCustomSchema( + tableSchema, customSchema, resolver) + case None => tableSchema + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 29916e153c35b..34b6c29e9d56a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.hadoop.hive.maprdb.json.input.HiveMapRDBJsonInputFormat import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} @@ -322,11 +321,10 @@ class HadoopTableReader( */ private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { val inputFormatClazz = localTableDesc.getInputFileFormatClass - if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz) - && !inputFormatClazz.isAssignableFrom(classOf[HiveMapRDBJsonInputFormat])) { - createNewHadoopRDD(localTableDesc, inputPathStr) - } else { + if (classOf[oldInputClass[_, _]].isAssignableFrom(inputFormatClazz)) { createOldHadoopRDD(localTableDesc, inputPathStr) + } else { + createNewHadoopRDD(localTableDesc, inputPathStr) } }