Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-23587
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 8, 2018
2 parents c55a634 + 77c91cc commit f86f40e
Show file tree
Hide file tree
Showing 40 changed files with 407 additions and 357 deletions.
10 changes: 8 additions & 2 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,18 @@ setMethod("alias",
#' @aliases substr,Column-method
#'
#' @param x a Column.
#' @param start starting position.
#' @param start starting position. It should be 1-base.
#' @param stop ending position.
#' @examples
#' \dontrun{
#' df <- createDataFrame(list(list(a="abcdef")))
#' collect(select(df, substr(df$a, 1, 4))) # the result is `abcd`.
#' collect(select(df, substr(df$a, 2, 4))) # the result is `bcd`.
#' }
#' @note substr since 1.4.0
setMethod("substr", signature(x = "Column"),
function(x, start, stop) {
jc <- callJMethod(x@jc, "substr", as.integer(start - 1), as.integer(stop - start + 1))
jc <- callJMethod(x@jc, "substr", as.integer(start), as.integer(stop - start + 1))
column(jc)
})

Expand Down
1 change: 1 addition & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1651,6 +1651,7 @@ test_that("string operators", {
expect_false(first(select(df, startsWith(df$name, "m")))[[1]])
expect_true(first(select(df, endsWith(df$name, "el")))[[1]])
expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
expect_equal(first(select(df, substr(df$name, 4, 6)))[[1]], "hae")
if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
expect_true(startsWith("Hello World", "Hello"))
expect_false(endsWith("Hello World", "a"))
Expand Down
26 changes: 25 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.{HttpURLConnection, URI, URL}
import java.nio.charset.StandardCharsets
import java.security.SecureRandom
import java.security.cert.X509Certificate
import java.util.Arrays
import java.util.{Arrays, Properties}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream}
import javax.net.ssl._
Expand All @@ -35,6 +35,7 @@ import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try

import com.google.common.io.{ByteStreams, Files}
import org.apache.log4j.PropertyConfigurator

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -256,6 +257,29 @@ private[spark] object TestUtils {
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}

/**
* config a log4j properties used for testsuite
*/
def configTestLog4j(level: String): Unit = {
val pro = new Properties()
pro.put("log4j.rootLogger", s"$level, console")
pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
pro.put("log4j.appender.console.target", "System.err")
pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
pro.put("log4j.appender.console.layout.ConversionPattern",
"%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
PropertyConfigurator.configure(pro)
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val current = f.listFiles
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{ByteArrayOutputStream, PrintStream}
import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.lang.reflect.InvocationTargetException
import java.net.URI
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -233,7 +233,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
name = Utils.stripDirectory(primaryResource)
name = new File(primaryResource).getName()
}

// Action should be SUBMIT unless otherwise specified
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
Expand Down Expand Up @@ -141,8 +142,7 @@ private[spark] class Executor(
conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
RpcUtils.maxMessageSizeBytes(conf))

// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
private val maxResultSize = conf.get(MAX_RESULT_SIZE)

// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.regex.PatternSyntaxException
import scala.util.matching.Regex

import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.util.Utils

private object ConfigHelpers {

Expand All @@ -45,7 +46,7 @@ private object ConfigHelpers {
}

def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
str.split(",").map(_.trim()).filter(_.nonEmpty).map(converter)
Utils.stringToSeq(str).map(converter)
}

def seqToString[T](v: Seq[T], stringConverter: T => String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,4 +520,9 @@ package object config {
.checkValue(v => v > 0, "The threshold should be positive.")
.createWithDefault(10000000)

private[spark] val MAX_RESULT_SIZE = ConfigBuilder("spark.driver.maxResultSize")
.doc("Size limit for results.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1g")

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ private[spark] class TaskSetManager(
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)

// Limit of bytes for total size of results (default is 1GB)
val maxResultSize = Utils.getMaxResultSize(conf)
val maxResultSize = conf.get(config.MAX_RESULT_SIZE)

val speculationEnabled = conf.getBoolean("spark.speculation", false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ final class ShuffleBlockFetcherIterator(
private[this] val startTime = System.currentTimeMillis

/** Local blocks to fetch, excluding zero-sized blocks. */
private[this] val localBlocks = new ArrayBuffer[BlockId]()
private[this] val localBlocks = scala.collection.mutable.LinkedHashSet[BlockId]()

/** Remote blocks to fetch, excluding zero-sized blocks. */
private[this] val remoteBlocks = new HashSet[BlockId]()
Expand Down Expand Up @@ -316,6 +316,7 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
private[this] def fetchLocalBlocks() {
logDebug(s"Start fetching local blocks: ${localBlocks.mkString(", ")}")
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
Expand All @@ -324,7 +325,8 @@ final class ShuffleBlockFetcherIterator(
shuffleMetrics.incLocalBlocksFetched(1)
shuffleMetrics.incLocalBytesRead(buf.size)
buf.retain()
results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf, false))
results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId,
buf.size(), buf, false))
} catch {
case e: Exception =>
// If we see an exception, stop immediately.
Expand Down Expand Up @@ -397,7 +399,9 @@ final class ShuffleBlockFetcherIterator(
}
shuffleMetrics.incRemoteBlocksFetched(1)
}
bytesInFlight -= size
if (!localBlocks.contains(blockId)) {
bytesInFlight -= size
}
if (isNetworkReqDone) {
reqsInFlight -= 1
logDebug("Number of requests in flight " + reqsInFlight)
Expand Down Expand Up @@ -583,8 +587,8 @@ object ShuffleBlockFetcherIterator {
* Result of a fetch from a remote block successfully.
* @param blockId block id
* @param address BlockManager that the block was fetched from.
* @param size estimated size of the block, used to calculate bytesInFlight.
* Note that this is NOT the exact bytes.
* @param size estimated size of the block. Note that this is NOT the exact bytes.
* Size of remote block is used to calculate bytesInFlight.
* @param buf `ManagedBuffer` for the content.
* @param isNetworkReqDone Is this the last network request for this host in this fetch request.
*/
Expand Down
Loading

0 comments on commit f86f40e

Please sign in to comment.