Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates from upstream #392

Merged
merged 41 commits into from
Jul 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c44eb56
[SPARK-24768][FOLLOWUP][SQL] Avro migration followup: change artifact…
gengliangwang Jul 25, 2018
571a6f0
[SPARK-23146][K8S] Support client mode.
mccheah Jul 25, 2018
2f77616
[SPARK-24849][SPARK-24911][SQL] Converting a value of StructType to a…
MaxGekk Jul 25, 2018
0c83f71
[SPARK-23146][K8S][TESTS] Enable client mode integration test.
mccheah Jul 25, 2018
17f469b
[SPARK-24860][SQL] Support setting of partitionOverWriteMode in outpu…
koertkuipers Jul 25, 2018
d2e7deb
[SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter
gatorsmile Jul 26, 2018
c9b233d
[SPARK-24878][SQL] Fix reverse function for array type of primitive t…
ueshin Jul 26, 2018
58353d7
[SPARK-24924][SQL] Add mapping for built-in Avro data source
dongjoon-hyun Jul 26, 2018
5ed7660
[SPARK-24802][SQL][FOLLOW-UP] Add a new config for Optimization Rule …
maryannxue Jul 26, 2018
e3486e1
[SPARK-24795][CORE] Implement barrier execution mode
jiangxb1987 Jul 26, 2018
2c82745
[SPARK-24307][CORE] Add conf to revert to old code.
squito Jul 26, 2018
fa09d91
[SPARK-24919][BUILD] New linter rule for sparkContext.hadoopConfigura…
gengliangwang Jul 26, 2018
094aa59
[SPARK-24801][CORE] Avoid memory waste by empty byte[] arrays in Sasl…
misha-cloudera Jul 27, 2018
dc3713c
[SPARK-24829][STS] In Spark Thrift Server, CAST AS FLOAT inconsistent…
Jul 27, 2018
f9c9d80
[SPARK-24929][INFRA] Make merge script don't swallow KeyboardInterrupt
HyukjinKwon Jul 27, 2018
e6e9031
[SPARK-24865] Remove AnalysisBarrier
rxin Jul 27, 2018
21fcac1
[SPARK-24288][SQL] Add a JDBC Option to enable preventing predicate p…
maryannxue Jul 27, 2018
ef6c839
[SPARK-23928][SQL] Add shuffle collection function.
huizhilu Jul 27, 2018
c9bec1d
[SPARK-24927][BUILD][BRANCH-2.3] The scope of snappy-java cannot be "…
liancheng Jul 27, 2018
0a0f68b
[SPARK-24881][SQL] New Avro option - compression
MaxGekk Jul 27, 2018
ee5a5a0
[SPARK-21960][STREAMING] Spark Streaming Dynamic Allocation should re…
karth295 Jul 27, 2018
5828f41
[SPARK-13343] speculative tasks that didn't commit shouldn't be marke…
Jul 27, 2018
10f1f19
[SPARK-21274][SQL] Implement EXCEPT ALL clause.
dilipbiswal Jul 27, 2018
34ebcc6
[MINOR] Improve documentation for HiveStringType's
rxin Jul 27, 2018
6424b14
[MINOR] Update docs for functions.scala to make it clear not all the …
rxin Jul 28, 2018
e875209
[SPARK-24624][SQL][PYTHON] Support mixture of Python UDF and Scalar P…
icexelloss Jul 28, 2018
c6a3db2
[SPARK-24924][SQL][FOLLOW-UP] Add mapping for built-in Avro data source
gatorsmile Jul 28, 2018
c5b8d54
[SPARK-24950][SQL] DateTimeUtilsSuite daysToMillis and millisToDays f…
d80tb7 Jul 28, 2018
8fe5d2c
[SPARK-24956][Build][test-maven] Upgrade maven version to 3.5.4
kiszk Jul 29, 2018
2c54aae
[SPARK-24809][SQL] Serializing LongToUnsafeRowMap in executor may res…
liutang123 Jul 29, 2018
3695ba5
[MINOR][CORE][TEST] Fix afterEach() in TastSetManagerSuite and TaskSc…
jiangxb1987 Jul 30, 2018
3210121
[MINOR][BUILD] Remove -Phive-thriftserver profile within appveyor.yml
HyukjinKwon Jul 30, 2018
6690924
[MINOR] Avoid the 'latest' link that might vary per release in functi…
HyukjinKwon Jul 30, 2018
65a4bc1
[SPARK-21274][SQL] Implement INTERSECT ALL clause
dilipbiswal Jul 30, 2018
bfe60fc
[SPARK-24934][SQL] Explicitly whitelist supported types in upper/lowe…
HyukjinKwon Jul 30, 2018
e3131cf
Merge branch 'master' into rk/upstream
Jul 30, 2018
2a6d99b
Resolve conflicts
Jul 30, 2018
4a798e7
Ignore SQLQuerySuite 'SPARK-18355 Read data from a hive table with a …
Jul 30, 2018
5889265
increase resource class
Jul 30, 2018
d0cc1fb
no need to constantly restart spark sessions
Jul 30, 2018
8eb984a
no tee and timeout
Jul 30, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,18 @@ jobs:
run-style-tests:
# depends only on build-maven
<<: *test-defaults
resource_class: medium
resource_class: medium+
steps:
- *checkout-code
# Need maven dependency cache, otherwise checkstyle tests fail as such:
# Failed to execute goal on project spark-assembly_2.11: Could not resolve dependencies for project org.apache.spark:spark-assembly_2.11:pom:2.4.0-SNAPSHOT
- restore_cache:
key: maven-dependency-cache-{{ checksum "pom.xml" }}
- *restore-build-binaries-cache
- run: dev/run-style-tests.py | tee /tmp/run-style-tests.log
- store_artifacts:
path: /tmp/run-style-tests.log
destination: run-style-tests.log
- run:
name: Run style tests
command: dev/run-style-tests.py
no_output_timeout: 15m

run-build-tests:
# depends only on build-maven
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ install:
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival')"

build_script:
- cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package
- cmd: mvn -DskipTests -Psparkr -Phive package

environment:
NOT_CRAN: true
Expand Down
5 changes: 5 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>${project.version}</version>
</dependency>

<!--
Because we don't shade dependencies anymore, we need to restore Guava to compile scope so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,14 @@ static class EncryptedMessage extends AbstractFileRegion {
private final boolean isByteBuf;
private final ByteBuf buf;
private final FileRegion region;
private final int maxOutboundBlockSize;

/**
* A channel used to buffer input data for encryption. The channel has an upper size bound
* so that if the input is larger than the allowed buffer, it will be broken into multiple
* chunks.
* chunks. Made non-final to enable lazy initialization, which saves memory.
*/
private final ByteArrayWritableChannel byteChannel;
private ByteArrayWritableChannel byteChannel;

private ByteBuf currentHeader;
private ByteBuffer currentChunk;
Expand All @@ -157,7 +158,7 @@ static class EncryptedMessage extends AbstractFileRegion {
this.isByteBuf = msg instanceof ByteBuf;
this.buf = isByteBuf ? (ByteBuf) msg : null;
this.region = isByteBuf ? null : (FileRegion) msg;
this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
this.maxOutboundBlockSize = maxOutboundBlockSize;
}

/**
Expand Down Expand Up @@ -292,6 +293,9 @@ public long transferTo(final WritableByteChannel target, final long position)
}

private void nextChunk() throws IOException {
if (byteChannel == null) {
byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
}
byteChannel.reset();
if (isByteBuf) {
int copied = byteChannel.write(buf.nioBuffer());
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.{Experimental, Since}

/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
trait BarrierTaskContext extends TaskContext {

/**
* :: Experimental ::
* Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to
* MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same
* stage have reached this routine.
*/
@Experimental
@Since("2.4.0")
def barrier(): Unit

/**
* :: Experimental ::
* Returns the all task infos in this barrier stage, the task infos are ordered by partitionId.
*/
@Experimental
@Since("2.4.0")
def getTaskInfos(): Array[BarrierTaskInfo]
}
49 changes: 49 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.util.Properties

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.MetricsSystem

/** A [[BarrierTaskContext]] implementation. */
private[spark] class BarrierTaskContextImpl(
override val stageId: Int,
override val stageAttemptNumber: Int,
override val partitionId: Int,
override val taskAttemptId: Long,
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
@transient private val metricsSystem: MetricsSystem,
// The default value is only used in tests.
override val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber,
taskMemoryManager, localProperties, metricsSystem, taskMetrics)
with BarrierTaskContext {

// TODO SPARK-24817 implement global barrier.
override def barrier(): Unit = {}

override def getTaskInfos(): Array[BarrierTaskInfo] = {
val addressesStr = localProperties.getProperty("addresses", "")
addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_))
}
}
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.{Experimental, Since}


/**
* :: Experimental ::
* Carries all task infos of a barrier task.
*
* @param address the IPv4 address(host:port) of the executor that a barrier task is running on
*/
@Experimental
@Since("2.4.0")
class BarrierTaskInfo(val address: String)
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,18 @@ private[spark] class MapOutputTrackerMaster(
}
}

/** Unregister all map output information of the given shuffle. */
def unregisterAllMapOutput(shuffleId: Int) {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.removeOutputsByFilter(x => true)
incrementEpoch()
case None =>
throw new SparkException(
s"unregisterAllMapOutput called for nonexistent shuffle ID $shuffleId.")
}
}

/** Unregister shuffle data */
def unregisterShuffle(shuffleId: Int) {
shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@ import org.apache.spark.{Partition, TaskContext}

/**
* An RDD that applies the provided function to every partition of the parent RDD.
*
* @param prev the parent RDD.
* @param f The function used to map a tuple of (TaskContext, partition index, input iterator) to
* an output iterator.
* @param preservesPartitioning Whether the input function preserves the partitioner, which should
* be `false` unless `prev` is a pair RDD and the input function
* doesn't modify the keys.
* @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage
* containing at least one RDDBarrier shall be turned into a barrier stage.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false)
extends RDD[U](prev) {

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
Expand All @@ -41,4 +51,7 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
super.clearDependencies()
prev = null
}

@transient protected lazy override val isBarrier_ : Boolean =
isFromBarrier || dependencies.exists(_.rdd.isBarrier())
}
27 changes: 26 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.TextOutputFormat

import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.partial.BoundedDouble
Expand Down Expand Up @@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag](
}
}

/**
* :: Experimental ::
* Indicates that Spark must launch the tasks together for the current stage.
*/
@Experimental
@Since("2.4.0")
def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))

// =======================================================================
// Other internal methods and fields
// =======================================================================
Expand Down Expand Up @@ -1839,6 +1847,23 @@ abstract class RDD[T: ClassTag](
def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
}

/**
* Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a
* barrier stage.
*
* An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from
* an [[RDDBarrier]]. This function always returns false for a [[ShuffledRDD]], since a
* [[ShuffledRDD]] indicates start of a new stage.
*
* A [[MapPartitionsRDD]] can be transformed from an [[RDDBarrier]], under that case the
* [[MapPartitionsRDD]] shall be marked as barrier.
*/
private[spark] def isBarrier(): Boolean = isBarrier_

// From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long
// RDD chain.
@transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier())
}


Expand Down
52 changes: 52 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.BarrierTaskContext
import org.apache.spark.TaskContext
import org.apache.spark.annotation.{Experimental, Since}

/** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */
class RDDBarrier[T: ClassTag](rdd: RDD[T]) {

/**
* :: Experimental ::
* Maps partitions together with a provided BarrierTaskContext.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys.
*/
@Experimental
@Since("2.4.0")
def mapPartitions[S: ClassTag](
f: (Iterator[T], BarrierTaskContext) => Iterator[S],
preservesPartitioning: Boolean = false): RDD[S] = rdd.withScope {
val cleanedF = rdd.sparkContext.clean(f)
new MapPartitionsRDD(
rdd,
(context: TaskContext, index: Int, iter: Iterator[T]) =>
cleanedF(iter, context.asInstanceOf[BarrierTaskContext]),
preservesPartitioning,
isFromBarrier = true
)
}

/** TODO extra conf(e.g. timeout) */
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,6 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
super.clearDependencies()
prev = null
}

private[spark] override def isBarrier(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ private[spark] class ActiveJob(
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0

/** Resets the status of all partitions in this stage so they are marked as not finished. */
def resetAllPartitions(): Unit = {
(0 until numPartitions).foreach(finished.update(_, false))
numFinished = 0
}
}
Loading