Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-34542
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Mar 21, 2021
2 parents 0ff5114 + 94fd6cb commit 145dba0
Show file tree
Hide file tree
Showing 36 changed files with 539 additions and 148 deletions.
6 changes: 6 additions & 0 deletions conf/log4j.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# For deploying Spark ThriftServer
# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
log4j.appender.console.filter.1.AcceptOnMatch=false
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.{Partitioner, RangePartitioner}
import org.apache.spark.{InterruptibleIterator, Partitioner, RangePartitioner, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.util.collection.ExternalSorter

/**
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
Expand Down Expand Up @@ -73,7 +74,16 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
* because it can push the sorting down into the shuffle machinery.
*/
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
val sorter = new ExternalSorter[K, V, V](context, None, None, Some(ordering))
new InterruptibleIterator(context,
sorter.insertAllAndUpdateMetrics(iter).asInstanceOf[Iterator[(K, V)]])
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,12 @@ private[spark] class BlockStoreShuffleReader[K, C](
}

// Sort the output if there is a sort ordering defined.
val resultIter = dep.keyOrdering match {
val resultIter: Iterator[Product2[K, C]] = dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener[Unit](_ => {
sorter.stop()
})
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
sorter.insertAllAndUpdateMetrics(aggregatedIter)
case None =>
aggregatedIter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.serializer._
import org.apache.spark.shuffle.ShufflePartitionPairsWriter
import org.apache.spark.shuffle.api.{ShuffleMapOutputWriter, ShufflePartitionWriter}
import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter, ShuffleBlockId}
import org.apache.spark.util.{Utils => TryUtils}
import org.apache.spark.util.{CompletionIterator, Utils => TryUtils}

/**
* Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner
Expand Down Expand Up @@ -672,6 +672,22 @@ private[spark] class ExternalSorter[K, V, C](
partitionedIterator.flatMap(pair => pair._2)
}

/**
* Insert all records, updates related task metrics, and return a completion iterator
* over all the data written to this object, aggregated by our aggregator.
* On task completion (success, failure, or cancellation), it releases resources by
* calling `stop()`.
*/
def insertAllAndUpdateMetrics(records: Iterator[Product2[K, V]]): Iterator[Product2[K, C]] = {
insertAll(records)
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener[Unit](_ => stop())
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](iterator, stop())
}

/**
* TODO(SPARK-28764): remove this, as this is only used by UnsafeRowSerializerSuite in the SQL
* project. We should figure out an alternative way to test that so that we can remove this
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,20 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
assert(partitions(1) === Seq((1, 3), (3, 8), (3, 8)))
}

test("SPARK-32384: repartitionAndSortWithinPartitions without shuffle") {
val data = sc.parallelize(Seq((0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)), 2)

val partitioner = new HashPartitioner(2)
val agged = data.reduceByKey(_ + _, 2)
assert(agged.partitioner == Some(partitioner))

val sorted = agged.repartitionAndSortWithinPartitions(partitioner)
assert(sorted.partitioner == Some(partitioner))

assert(sorted.dependencies.nonEmpty &&
sorted.dependencies.forall(_.isInstanceOf[OneToOneDependency[_]]))
}

test("cartesian on empty RDD") {
val a = sc.emptyRDD[Int]
val b = sc.parallelize(1 to 3)
Expand Down
4 changes: 2 additions & 2 deletions docs/ml-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -1497,8 +1497,8 @@ for more details on the API.

## Imputer

The `Imputer` estimator completes missing values in a dataset, either using the mean or the
median of the columns in which the missing values are located. The input columns should be of
The `Imputer` estimator completes missing values in a dataset, using the mean, median or mode
of the columns in which the missing values are located. The input columns should be of
numeric type. Currently `Imputer` does not support categorical features and possibly
creates incorrect values for columns containing categorical features. Imputer can impute custom values
other than 'NaN' by `.setMissingValue(custom_value)`. For example, `.setMissingValue(0)` will impute
Expand Down
9 changes: 8 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,14 @@ To use a secret through an environment variable use the following options to the
Kubernetes allows defining pods from [template files](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates).
Spark users can similarly use template files to define the driver or executor pod configurations that Spark configurations do not support.
To do so, specify the spark properties `spark.kubernetes.driver.podTemplateFile` and `spark.kubernetes.executor.podTemplateFile`
to point to local files accessible to the `spark-submit` process. To allow the driver pod access the executor pod template
to point to files accessible to the `spark-submit` process.

```
--conf spark.kubernetes.driver.podTemplateFile=s3a://bucket/driver.yml
--conf spark.kubernetes.executor.podTemplateFile=s3a://bucket/executor.yml
```

To allow the driver pod access the executor pod template
file, the file will be automatically mounted onto a volume in the driver pod when it's created.
Spark does not do any validation after unmarshalling these template files and relies on the Kubernetes API server for validation.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[feature] trait ImputerParams extends Params with HasInputCol with HasInp
}

/**
* Imputation estimator for completing missing values, either using the mean or the median
* Imputation estimator for completing missing values, using the mean, median or mode
* of the columns in which the missing values are located. The input columns should be of
* numeric type. Currently Imputer does not support categorical features
* (SPARK-15041) and possibly creates incorrect values for a categorical feature.
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/ml/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ def getMissingValue(self):
@inherit_doc
class Imputer(JavaEstimator, _ImputerParams, JavaMLReadable, JavaMLWritable):
"""
Imputation estimator for completing missing values, either using the mean or the median
Imputation estimator for completing missing values, using the mean, median or mode
of the columns in which the missing values are located. The input columns should be of
numeric type. Currently Imputer does not support categorical features and
possibly creates incorrect values for a categorical feature.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.resource.ResourceUtils
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.DependencyUtils.downloadFile
import org.apache.spark.util.Utils.getHadoopFileSystem

private[spark] object KubernetesUtils extends Logging {
Expand Down Expand Up @@ -81,9 +82,13 @@ private[spark] object KubernetesUtils extends Logging {

def loadPodFromTemplate(
kubernetesClient: KubernetesClient,
templateFile: File,
containerName: Option[String]): SparkPod = {
templateFileName: String,
containerName: Option[String],
conf: SparkConf): SparkPod = {
try {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val localFile = downloadFile(templateFileName, Utils.createTempDir(), conf, hadoopConf)
val templateFile = new File(new java.net.URI(localFile).getPath)
val pod = kubernetesClient.pods().load(templateFile).get()
selectSparkContainer(pod, containerName)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import java.nio.charset.StandardCharsets
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, HasMetadata, PodBuilder}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.util.DependencyUtils.downloadFile
import org.apache.spark.util.Utils

private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
Expand Down Expand Up @@ -75,7 +78,10 @@ private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf)
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (hasTemplate) {
val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf.sparkConf)
val uri = downloadFile(podTemplateFile, Utils.createTempDir(), conf.sparkConf, hadoopConf)
val file = new java.net.URI(uri).getPath
val podTemplateString = Files.toString(new File(file), StandardCharsets.UTF_8)
Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(configmapName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.deploy.k8s.submit

import java.io.File

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.deploy.k8s._
Expand All @@ -33,8 +31,9 @@ private[spark] class KubernetesDriverBuilder {
.map { file =>
KubernetesUtils.loadPodFromTemplate(
client,
new File(file),
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
file,
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME),
conf.sparkConf)
}
.getOrElse(SparkPod.initialPod())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
KubernetesUtils.loadPodFromTemplate(
kubernetesClient,
new File(sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get),
sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get,
sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME),
sc.conf)
}

val schedulerExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.scheduler.cluster.k8s

import java.io.File

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SecurityManager
Expand All @@ -37,8 +35,9 @@ private[spark] class KubernetesExecutorBuilder {
.map { file =>
KubernetesUtils.loadPodFromTemplate(
client,
new File(file),
conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
file,
conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME),
conf.sparkConf)
}
.getOrElse(SparkPod.initialPod())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3114,56 +3114,6 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

/**
* Pulls out nondeterministic expressions from LogicalPlan which is not Project or Filter,
* put them into an inner Project and finally project them away at the outer Project.
*/
object PullOutNondeterministic extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case p if !p.resolved => p // Skip unresolved nodes.
case p: Project => p
case f: Filter => f

case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) =>
val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child)
a.transformExpressions { case e =>
nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
}.copy(child = newChild)

// Don't touch collect metrics. Top-level metrics are not supported (check analysis will fail)
// and we want to retain them inside the aggregate functions.
case m: CollectMetrics => m

// todo: It's hard to write a general rule to pull out nondeterministic expressions
// from LogicalPlan, currently we only do it for UnaryNode which has same output
// schema with its child.
case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) =>
val nondeterToAttr = getNondeterToAttr(p.expressions)
val newPlan = p.transformExpressions { case e =>
nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
}
val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child)
Project(p.output, newPlan.withNewChildren(newChild :: Nil))
}

private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = {
exprs.filterNot(_.deterministic).flatMap { expr =>
val leafNondeterministic = expr.collect {
case n: Nondeterministic => n
case udf: UserDefinedExpression if !udf.deterministic => udf
}
leafNondeterministic.distinct.map { e =>
val ne = e match {
case n: NamedExpression => n
case _ => Alias(e, "_nondeterministic")()
}
e -> ne
}
}.toMap
}
}

/**
* Set the seed for random number generation.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule

/**
* Pulls out nondeterministic expressions from LogicalPlan which is not Project or Filter,
* put them into an inner Project and finally project them away at the outer Project.
*/
object PullOutNondeterministic extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp applyLocally

val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
case p if !p.resolved => p // Skip unresolved nodes.
case p: Project => p
case f: Filter => f

case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) =>
val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child)
a.transformExpressions { case e =>
nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
}.copy(child = newChild)

// Don't touch collect metrics. Top-level metrics are not supported (check analysis will fail)
// and we want to retain them inside the aggregate functions.
case m: CollectMetrics => m

// todo: It's hard to write a general rule to pull out nondeterministic expressions
// from LogicalPlan, currently we only do it for UnaryNode which has same output
// schema with its child.
case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) =>
val nondeterToAttr = getNondeterToAttr(p.expressions)
val newPlan = p.transformExpressions { case e =>
nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
}
val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child)
Project(p.output, newPlan.withNewChildren(newChild :: Nil))
}

private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = {
exprs.filterNot(_.deterministic).flatMap { expr =>
val leafNondeterministic = expr.collect {
case n: Nondeterministic => n
case udf: UserDefinedExpression if !udf.deterministic => udf
}
leafNondeterministic.distinct.map { e =>
val ne = e match {
case n: NamedExpression => n
case _ => Alias(e, "_nondeterministic")()
}
e -> ne
}
}.toMap
}
}
Loading

0 comments on commit 145dba0

Please sign in to comment.