Skip to content

Commit

Permalink
Merge pull request apache#392 from palantir/rk/upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Jul 30, 2018
2 parents df34e2d + 8eb984a commit c4f20b9
Show file tree
Hide file tree
Showing 150 changed files with 4,128 additions and 724 deletions.
10 changes: 5 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,18 @@ jobs:
run-style-tests:
# depends only on build-maven
<<: *test-defaults
resource_class: medium
resource_class: medium+
steps:
- *checkout-code
# Need maven dependency cache, otherwise checkstyle tests fail as such:
# Failed to execute goal on project spark-assembly_2.11: Could not resolve dependencies for project org.apache.spark:spark-assembly_2.11:pom:2.4.0-SNAPSHOT
- restore_cache:
key: maven-dependency-cache-{{ checksum "pom.xml" }}
- *restore-build-binaries-cache
- run: dev/run-style-tests.py | tee /tmp/run-style-tests.log
- store_artifacts:
path: /tmp/run-style-tests.log
destination: run-style-tests.log
- run:
name: Run style tests
command: dev/run-style-tests.py
no_output_timeout: 15m

run-build-tests:
# depends only on build-maven
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ install:
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival')"

build_script:
- cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package
- cmd: mvn -DskipTests -Psparkr -Phive package

environment:
NOT_CRAN: true
Expand Down
5 changes: 5 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>${project.version}</version>
</dependency>

<!--
Because we don't shade dependencies anymore, we need to restore Guava to compile scope so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,14 @@ static class EncryptedMessage extends AbstractFileRegion {
private final boolean isByteBuf;
private final ByteBuf buf;
private final FileRegion region;
private final int maxOutboundBlockSize;

/**
* A channel used to buffer input data for encryption. The channel has an upper size bound
* so that if the input is larger than the allowed buffer, it will be broken into multiple
* chunks.
* chunks. Made non-final to enable lazy initialization, which saves memory.
*/
private final ByteArrayWritableChannel byteChannel;
private ByteArrayWritableChannel byteChannel;

private ByteBuf currentHeader;
private ByteBuffer currentChunk;
Expand All @@ -157,7 +158,7 @@ static class EncryptedMessage extends AbstractFileRegion {
this.isByteBuf = msg instanceof ByteBuf;
this.buf = isByteBuf ? (ByteBuf) msg : null;
this.region = isByteBuf ? null : (FileRegion) msg;
this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
this.maxOutboundBlockSize = maxOutboundBlockSize;
}

/**
Expand Down Expand Up @@ -292,6 +293,9 @@ public long transferTo(final WritableByteChannel target, final long position)
}

private void nextChunk() throws IOException {
if (byteChannel == null) {
byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
}
byteChannel.reset();
if (isByteBuf) {
int copied = byteChannel.write(buf.nioBuffer());
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.{Experimental, Since}

/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
trait BarrierTaskContext extends TaskContext {

/**
* :: Experimental ::
* Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to
* MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same
* stage have reached this routine.
*/
@Experimental
@Since("2.4.0")
def barrier(): Unit

/**
* :: Experimental ::
* Returns the all task infos in this barrier stage, the task infos are ordered by partitionId.
*/
@Experimental
@Since("2.4.0")
def getTaskInfos(): Array[BarrierTaskInfo]
}
49 changes: 49 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.util.Properties

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.MetricsSystem

/** A [[BarrierTaskContext]] implementation. */
private[spark] class BarrierTaskContextImpl(
override val stageId: Int,
override val stageAttemptNumber: Int,
override val partitionId: Int,
override val taskAttemptId: Long,
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
@transient private val metricsSystem: MetricsSystem,
// The default value is only used in tests.
override val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber,
taskMemoryManager, localProperties, metricsSystem, taskMetrics)
with BarrierTaskContext {

// TODO SPARK-24817 implement global barrier.
override def barrier(): Unit = {}

override def getTaskInfos(): Array[BarrierTaskInfo] = {
val addressesStr = localProperties.getProperty("addresses", "")
addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_))
}
}
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.{Experimental, Since}


/**
* :: Experimental ::
* Carries all task infos of a barrier task.
*
* @param address the IPv4 address(host:port) of the executor that a barrier task is running on
*/
@Experimental
@Since("2.4.0")
class BarrierTaskInfo(val address: String)
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,18 @@ private[spark] class MapOutputTrackerMaster(
}
}

/** Unregister all map output information of the given shuffle. */
def unregisterAllMapOutput(shuffleId: Int) {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.removeOutputsByFilter(x => true)
incrementEpoch()
case None =>
throw new SparkException(
s"unregisterAllMapOutput called for nonexistent shuffle ID $shuffleId.")
}
}

/** Unregister shuffle data */
def unregisterShuffle(shuffleId: Int) {
shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@ import org.apache.spark.{Partition, TaskContext}

/**
* An RDD that applies the provided function to every partition of the parent RDD.
*
* @param prev the parent RDD.
* @param f The function used to map a tuple of (TaskContext, partition index, input iterator) to
* an output iterator.
* @param preservesPartitioning Whether the input function preserves the partitioner, which should
* be `false` unless `prev` is a pair RDD and the input function
* doesn't modify the keys.
* @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage
* containing at least one RDDBarrier shall be turned into a barrier stage.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false)
extends RDD[U](prev) {

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
Expand All @@ -41,4 +51,7 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
super.clearDependencies()
prev = null
}

@transient protected lazy override val isBarrier_ : Boolean =
isFromBarrier || dependencies.exists(_.rdd.isBarrier())
}
27 changes: 26 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.TextOutputFormat

import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.partial.BoundedDouble
Expand Down Expand Up @@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag](
}
}

/**
* :: Experimental ::
* Indicates that Spark must launch the tasks together for the current stage.
*/
@Experimental
@Since("2.4.0")
def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))

// =======================================================================
// Other internal methods and fields
// =======================================================================
Expand Down Expand Up @@ -1839,6 +1847,23 @@ abstract class RDD[T: ClassTag](
def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
}

/**
* Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a
* barrier stage.
*
* An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from
* an [[RDDBarrier]]. This function always returns false for a [[ShuffledRDD]], since a
* [[ShuffledRDD]] indicates start of a new stage.
*
* A [[MapPartitionsRDD]] can be transformed from an [[RDDBarrier]], under that case the
* [[MapPartitionsRDD]] shall be marked as barrier.
*/
private[spark] def isBarrier(): Boolean = isBarrier_

// From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long
// RDD chain.
@transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier())
}


Expand Down
52 changes: 52 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.BarrierTaskContext
import org.apache.spark.TaskContext
import org.apache.spark.annotation.{Experimental, Since}

/** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */
class RDDBarrier[T: ClassTag](rdd: RDD[T]) {

/**
* :: Experimental ::
* Maps partitions together with a provided BarrierTaskContext.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys.
*/
@Experimental
@Since("2.4.0")
def mapPartitions[S: ClassTag](
f: (Iterator[T], BarrierTaskContext) => Iterator[S],
preservesPartitioning: Boolean = false): RDD[S] = rdd.withScope {
val cleanedF = rdd.sparkContext.clean(f)
new MapPartitionsRDD(
rdd,
(context: TaskContext, index: Int, iter: Iterator[T]) =>
cleanedF(iter, context.asInstanceOf[BarrierTaskContext]),
preservesPartitioning,
isFromBarrier = true
)
}

/** TODO extra conf(e.g. timeout) */
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,6 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
super.clearDependencies()
prev = null
}

private[spark] override def isBarrier(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ private[spark] class ActiveJob(
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0

/** Resets the status of all partitions in this stage so they are marked as not finished. */
def resetAllPartitions(): Unit = {
(0 until numPartitions).foreach(finished.update(_, false))
numFinished = 0
}
}
Loading

0 comments on commit c4f20b9

Please sign in to comment.