Skip to content

Commit

Permalink
[SPARK-26817][CORE] Use System.nanoTime to measure time intervals
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In the PR, I propose to use `System.nanoTime()` instead of `System.currentTimeMillis()` in measurements of time intervals.

`System.currentTimeMillis()` returns current wallclock time and will follow changes to the system clock. Thus, negative wallclock adjustments can cause timeouts to "hang" for a long time (until wallclock time has caught up to its previous value again). This can happen when ntpd does a "step" after the network has been disconnected for some time. The most canonical example is during system bootup when DHCP takes longer than usual. This can lead to failures that are really hard to understand/reproduce. `System.nanoTime()` is guaranteed to be monotonically increasing irrespective of wallclock changes.

## How was this patch tested?

By existing test suites.

Closes apache#23727 from MaxGekk/system-nanotime.

Lead-authored-by: Maxim Gekk <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
2 people authored and jackylee-ch committed Feb 18, 2019
1 parent 2a0c294 commit ef4f68b
Show file tree
Hide file tree
Showing 40 changed files with 202 additions and 168 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark

import java.io._
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor}
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -706,7 +706,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
val startTime = System.currentTimeMillis
val startTimeNs = System.nanoTime()
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
// Someone else is fetching it; wait for them to be done
Expand Down Expand Up @@ -744,7 +744,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
}
}
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${System.currentTimeMillis - startTime} ms")
s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")

if (fetchedStatuses != null) {
fetchedStatuses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.api.python

import java.io.{DataInputStream, DataOutputStream, EOFException, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
import java.nio.charset.StandardCharsets
import java.util.Arrays
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -84,7 +84,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
@GuardedBy("self")
private val idleWorkers = new mutable.Queue[Socket]()
@GuardedBy("self")
private var lastActivity = 0L
private var lastActivityNs = 0L
new MonitorThread().start()

@GuardedBy("self")
Expand Down Expand Up @@ -291,9 +291,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
override def run() {
while (true) {
self.synchronized {
if (lastActivity + IDLE_WORKER_TIMEOUT_MS < System.currentTimeMillis()) {
if (IDLE_WORKER_TIMEOUT_NS < System.nanoTime() - lastActivityNs) {
cleanupIdleWorkers()
lastActivity = System.currentTimeMillis()
lastActivityNs = System.nanoTime()
}
}
Thread.sleep(10000)
Expand Down Expand Up @@ -358,7 +358,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
def releaseWorker(worker: Socket) {
if (useDaemon) {
self.synchronized {
lastActivity = System.currentTimeMillis()
lastActivityNs = System.nanoTime()
idleWorkers.enqueue(worker)
}
} else {
Expand All @@ -375,5 +375,5 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

private object PythonWorkerFactory {
val PROCESS_WAIT_TIMEOUT_MS = 10000
val IDLE_WORKER_TIMEOUT_MS = 60000 // kill idle workers after 1 minute
val IDLE_WORKER_TIMEOUT_NS = TimeUnit.MINUTES.toNanos(1) // kill idle workers after 1 minute
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
val estimatedTotalSize = Utils.bytesToString(numBlocks * blockSize)
logInfo(s"Started reading broadcast variable $id with $numBlocks pieces " +
s"(estimated total size $estimatedTotalSize)")
val startTimeMs = System.currentTimeMillis()
val startTimeNs = System.nanoTime()
val blocks = readBlocks()
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
logInfo(s"Reading broadcast variable $id took ${Utils.getUsedTimeNs(startTimeNs)}")

try {
val obj = TorrentBroadcast.unBlockifyObject[T](
Expand Down
45 changes: 26 additions & 19 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,11 @@ private[spark] class Executor(
* 2. Collect accumulator updates
* 3. Set the finished flag to true and clear current thread's interrupt status
*/
private def collectAccumulatorsAndResetStatusOnFailure(taskStartTime: Long) = {
private def collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs: Long) = {
// Report executor runtime and JVM gc time
Option(task).foreach(t => {
t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStartTime)
t.metrics.setExecutorRunTime(TimeUnit.NANOSECONDS.toMillis(
System.nanoTime() - taskStartTimeNs))
t.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
})

Expand All @@ -369,15 +370,15 @@ private[spark] class Executor(
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStartTime: Long = 0
var taskStartTimeNs: Long = 0
var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()

Expand Down Expand Up @@ -413,7 +414,7 @@ private[spark] class Executor(
}

// Run the actual task and measure its runtime.
taskStartTime = System.currentTimeMillis()
taskStartTimeNs = System.nanoTime()
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
Expand Down Expand Up @@ -457,7 +458,7 @@ private[spark] class Executor(
s"unrecoverable fetch failures! Most likely this means user code is incorrectly " +
s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure)
}
val taskFinish = System.currentTimeMillis()
val taskFinishNs = System.nanoTime()
val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
Expand All @@ -466,22 +467,24 @@ private[spark] class Executor(
task.context.killTaskIfInterrupted()

val resultSer = env.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val beforeSerializationNs = System.nanoTime()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
val afterSerializationNs = System.nanoTime()

// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
task.metrics.setExecutorDeserializeTime(
(taskStartTime - deserializeStartTime) + task.executorDeserializeTime)
task.metrics.setExecutorDeserializeTime(TimeUnit.NANOSECONDS.toMillis(
(taskStartTimeNs - deserializeStartTimeNs) + task.executorDeserializeTimeNs))
task.metrics.setExecutorDeserializeCpuTime(
(taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
task.metrics.setExecutorRunTime((taskFinish - taskStartTime) - task.executorDeserializeTime)
task.metrics.setExecutorRunTime(TimeUnit.NANOSECONDS.toMillis(
(taskFinishNs - taskStartTimeNs) - task.executorDeserializeTimeNs))
task.metrics.setExecutorCpuTime(
(taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)
task.metrics.setResultSerializationTime(TimeUnit.NANOSECONDS.toMillis(
afterSerializationNs - beforeSerializationNs))

// Expose task metrics using the Dropwizard metrics system.
// Update task metrics counters
Expand Down Expand Up @@ -560,7 +563,7 @@ private[spark] class Executor(
case t: TaskKilledException =>
logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}")

val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums))
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)

Expand All @@ -569,7 +572,7 @@ private[spark] class Executor(
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")

val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums))
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)

Expand Down Expand Up @@ -604,7 +607,7 @@ private[spark] class Executor(
// the task failure would not be ignored if the shutdown happened because of premption,
// instead of an app issue).
if (!ShutdownHookManager.inShutdown()) {
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)

val serializedTaskEndReason = {
try {
Expand Down Expand Up @@ -669,14 +672,16 @@ private[spark] class Executor(

private[this] val killPollingIntervalMs: Long = conf.get(TASK_REAPER_POLLING_INTERVAL)

private[this] val killTimeoutMs: Long = conf.get(TASK_REAPER_KILL_TIMEOUT)
private[this] val killTimeoutNs: Long = {
TimeUnit.MILLISECONDS.toNanos(conf.get(TASK_REAPER_KILL_TIMEOUT))
}

private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)

override def run(): Unit = {
val startTimeMs = System.currentTimeMillis()
def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
def timeoutExceeded(): Boolean = killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs
val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs
try {
// Only attempt to kill the task once. If interruptThread = false then a second kill
// attempt would be a no-op and if interruptThread = true then it may not be safe or
Expand All @@ -701,6 +706,7 @@ private[spark] class Executor(
if (taskRunner.isFinished) {
finished = true
} else {
val elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeNs)
logWarning(s"Killed task $taskId is still running after $elapsedTimeMs ms")
if (takeThreadDump) {
try {
Expand All @@ -718,6 +724,7 @@ private[spark] class Executor(
}

if (!taskRunner.isFinished && timeoutExceeded()) {
val killTimeoutMs = TimeUnit.NANOSECONDS.toMillis(killTimeoutNs)
if (isLocal) {
logError(s"Killed task $taskId could not be stopped within $killTimeoutMs ms; " +
"not killing JVM because we are running in local mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ private[spark] class ResultTask[T, U](
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ private[spark] class ShuffleMapTask(
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private[spark] abstract class Task[T](
// context is not yet initialized when kill() is invoked.
@volatile @transient private var _reasonIfKilled: String = null

protected var _executorDeserializeTime: Long = 0
protected var _executorDeserializeTimeNs: Long = 0
protected var _executorDeserializeCpuTime: Long = 0

/**
Expand All @@ -193,7 +193,7 @@ private[spark] abstract class Task[T](
/**
* Returns the amount of time spent deserializing the RDD and function to be run.
*/
def executorDeserializeTime: Long = _executorDeserializeTime
def executorDeserializeTimeNs: Long = _executorDeserializeTimeNs
def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
math.min(1, conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
private val maxRegisteredWaitingTimeMs =
conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME)
private val createTime = System.currentTimeMillis()
private val maxRegisteredWaitingTimeNs = TimeUnit.MILLISECONDS.toNanos(
conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME))
private val createTimeNs = System.nanoTime()

// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
// protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
Expand Down Expand Up @@ -499,9 +499,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
return true
}
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) {
if ((System.nanoTime() - createTimeNs) >= maxRegisteredWaitingTimeNs) {
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)")
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeNs(ns)")
return true
}
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.{Closeable, InputStream, IOException, OutputStream}
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, WritableByteChannel}
import java.util.Properties
import java.util.concurrent.TimeUnit
import javax.crypto.KeyGenerator
import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}

Expand Down Expand Up @@ -133,10 +134,10 @@ private[spark] object CryptoStreamUtils extends Logging {
*/
private[this] def createInitializationVector(properties: Properties): Array[Byte] = {
val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
val initialIVStart = System.currentTimeMillis()
val initialIVStart = System.nanoTime()
CryptoRandomFactory.getCryptoRandom(properties).nextBytes(iv)
val initialIVFinish = System.currentTimeMillis()
val initialIVTime = initialIVFinish - initialIVStart
val initialIVFinish = System.nanoTime()
val initialIVTime = TimeUnit.NANOSECONDS.toMillis(initialIVFinish - initialIVStart)
if (initialIVTime > 2000) {
logWarning(s"It costs ${initialIVTime} milliseconds to create the Initialization Vector " +
s"used by CryptoStream")
Expand Down
Loading

0 comments on commit ef4f68b

Please sign in to comment.