Skip to content

Commit

Permalink
MapR [SPARK-979] Backport all needed 3.1.2 EEP commits tp 3.2 branch (a…
Browse files Browse the repository at this point in the history
…pache#913)

* MapR [SPARK-953] Investigate and add all needed changes for Spark services (apache#905)

* [EZSPA-347] Find a way to pass sensitive configs in secure manner (apache#907)

* MapR [SPARK-961] Spark job can't be properly killed using yarn API or CLI (apache#908)

* MapR [SPARK-962] MSSQL can not handle SQL syntax which is used in Spark (apache#909)

* MapR [SPARK-963] select from hbase table which was created via hive fails (apache#910)

Co-authored-by: Dmitry Popkov <[email protected]>
Co-authored-by: Andrew Khalymon <[email protected]>
  • Loading branch information
3 people authored and ekrivokonmapr committed Nov 6, 2023
1 parent a95ba07 commit 33c4d45
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 12 deletions.
3 changes: 3 additions & 0 deletions conf/spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,6 @@ export SPARK_WORKER_DIR=$SPARK_HOME/tmp

#UI
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djava.library.path=$SPARK_MAPR_HOME/lib"
export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Djava.library.path=$SPARK_MAPR_HOME/lib"
export SPARK_MASTER_HOST=$(hostname --fqdn)
export SPARK_MASTER_IP=$(hostname --fqdn)
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ private[spark] class CoarseGrainedExecutorBackend(
} else {
logInfo("Skip exiting executor since it's been already asked to exit before.")
}
self.send(Shutdown)
}

private def decommissionSelf(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,12 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val MAPR_SPARK_EXTRACONF_SECRET_NAME =
ConfigBuilder("spark.mapr.extraconf.secret")
.doc("Name of the secret with Spark extra configurations that will be added to sparkConf")
.stringConf
.createOptional

val MAPR_CLUSTER_CONFIGMAP =
ConfigBuilder("spark.mapr.cluster.configMap")
.doc("Name of the mapr cluster config map")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private[spark] object Constants {
val MAPR_USER_TICKET_SUBPATH = "CONTAINER_TICKET"
val MAPR_USER_SECRET_MOUNT_PATH = "/tmp/usersecret"
val MAPR_USER_TICKET_MOUNT_PATH = s"$MAPR_USER_SECRET_MOUNT_PATH/$MAPR_USER_TICKET_SUBPATH"
val MAPR_SPARK_EXTRA_CONFIG_MOUNT_PATH = "/opt/mapr/kubernetes/spark_secrets"

val ENV_MAPR_METRICSFILE_LOCATION = "MAPR_METRICSFILE_LOCATION"
val MAPR_METRICS_TICKET_SUBPATH = "maprmetricsticket"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private[spark] class MaprConfigFeatureStep(conf: KubernetesConf)
applySSSDSecret(podBuilder, containerBuilder)
applySSHSecret(podBuilder, containerBuilder)
applyClientSecret(podBuilder, containerBuilder)
applySparkExtraConfigs(podBuilder, containerBuilder)

SparkPod(podBuilder.build(), containerBuilder.build())
}
Expand Down Expand Up @@ -130,6 +131,32 @@ private[spark] class MaprConfigFeatureStep(conf: KubernetesConf)
.endVolumeMount()
}

private def applySparkExtraConfigs(podBuilder: PodBuilder, containerBuilder: ContainerBuilder): Unit = {
val confSecretName = sparkConf.get(MAPR_SPARK_EXTRACONF_SECRET_NAME).get

if (confSecretName.isEmpty) {
return
}

val confSecretVolumeName = "spark-extraconf-secret"

podBuilder.editOrNewSpec()
.addNewVolume()
.withName(confSecretVolumeName)
.withNewSecret()
.withSecretName(confSecretName)
.endSecret()
.endVolume()
.endSpec()

containerBuilder
.addNewVolumeMount()
.withName(confSecretVolumeName)
.withMountPath(MAPR_SPARK_EXTRA_CONFIG_MOUNT_PATH)
.endVolumeMount()
}


private def applyUserSecret(podBuilder: PodBuilder, containerBuilder: ContainerBuilder): Unit = {
val userSecretNameConfig = sparkConf.get(MAPR_USER_SECRET)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ class JDBCOptions(
if (subquery.isEmpty) {
throw QueryExecutionErrors.emptyOptionError(JDBC_QUERY_STRING)
} else {
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
val runQueryAsIs = parameters.getOrElse(JDBC_USE_RAW_QUERY, "false").toBoolean
if (runQueryAsIs) {
s"${subquery}"
} else {
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
}
}
}

Expand Down Expand Up @@ -258,6 +263,7 @@ object JDBCOptions {
val JDBC_URL = newOption("url")
val JDBC_TABLE_NAME = newOption("dbtable")
val JDBC_QUERY_STRING = newOption("query")
val JDBC_USE_RAW_QUERY = newOption("useRawQuery")
val JDBC_DRIVER_CLASS = newOption("driver")
val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
val JDBC_LOWER_BOUND = newOption("lowerBound")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import scala.math.BigDecimal.RoundingMode
import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.connector.expressions.SortOrder
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.{JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES, JDBC_USE_RAW_QUERY}
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.JdbcDialects
Expand Down Expand Up @@ -239,11 +241,23 @@ private[sql] object JDBCRelation extends Logging {
* @return resolved Catalyst schema of a JDBC table
*/
def getSchema(resolver: Resolver, jdbcOptions: JDBCOptions): StructType = {
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
jdbcOptions.customSchema match {
case Some(customSchema) => JdbcUtils.getCustomSchema(
tableSchema, customSchema, resolver)
case None => tableSchema
val runQueryAsIs = jdbcOptions.parameters.getOrElse(JDBC_USE_RAW_QUERY, "false").toBoolean
if (runQueryAsIs) {
val customSchema = jdbcOptions.parameters.get(JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES)
val newSchema = jdbcOptions.customSchema match {
case Some(customSchema) => CatalystSqlParser.parseTableSchema(customSchema)
case None => throw new IllegalArgumentException(
s"Field $JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES is mandatory when using $JDBC_USE_RAW_QUERY")
}
logInfo(s"Option $JDBC_USE_RAW_QUERY is enabled, parsed $newSchema from the filed $JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES with value $customSchema")
newSchema
} else {
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
jdbcOptions.customSchema match {
case Some(customSchema) => JdbcUtils.getCustomSchema(
tableSchema, customSchema, resolver)
case None => tableSchema
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.maprdb.json.input.HiveMapRDBJsonInputFormat
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
Expand Down Expand Up @@ -322,11 +321,10 @@ class HadoopTableReader(
*/
private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = {
val inputFormatClazz = localTableDesc.getInputFileFormatClass
if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)
&& !inputFormatClazz.isAssignableFrom(classOf[HiveMapRDBJsonInputFormat])) {
createNewHadoopRDD(localTableDesc, inputPathStr)
} else {
if (classOf[oldInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
createOldHadoopRDD(localTableDesc, inputPathStr)
} else {
createNewHadoopRDD(localTableDesc, inputPathStr)
}
}

Expand Down

0 comments on commit 33c4d45

Please sign in to comment.