Skip to content

Commit

Permalink
[SPARK-2952] Enable logging actor messages at DEBUG level
Browse files Browse the repository at this point in the history
Example messages:
```
14/08/09 21:37:01 DEBUG BlockManagerMasterActor: [actor] received message RegisterBlockManager(BlockManagerId(0, rxin-mbp, 58092, 0),278302556,Actor[akka.tcp://spark@rxin-mbp:58088/user/BlockManagerActor1#-63596539]) from Actor[akka.tcp://spark@rxin-mbp:58088/temp/$c]

14/08/09 21:37:01 DEBUG BlockManagerMasterActor: [actor] handled message (0.279 ms) RegisterBlockManager(BlockManagerId(0, rxin-mbp, 58092, 0),278302556,Actor[akka.tcp://spark@rxin-mbp:58088/user/BlockManagerActor1#-63596539]) from Actor[akka.tcp://spark@rxin-mbp:58088/temp/$c]
```

cc @mengxr @tdas @pwendell

Author: Reynold Xin <[email protected]>

Closes #1870 from rxin/actorLogging and squashes the following commits:

c531ee5 [Reynold Xin] Added license header for ActorLogReceive.
f6b1ebe [Reynold Xin] [SPARK-2952] Enable logging actor messages at DEBUG level
  • Loading branch information
rxin committed Aug 11, 2014
1 parent db06a81 commit 3733866
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 38 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import akka.actor.Actor
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.util.ActorLogReceive

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand All @@ -36,8 +37,10 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
override def receive = {
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage

/** Actor class for MapOutputTrackerMaster */
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

def receive = {
override def receiveWithLogging = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = sender.path.address.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}

/**
* Proxy that relays messages to the driver.
*/
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {

var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)

Expand Down Expand Up @@ -114,7 +116,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
}
}

override def receive = {
override def receiveWithLogging = {

case SubmitDriverResponse(success, driverId, message) =>
println(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}

/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
Expand All @@ -56,7 +56,7 @@ private[spark] class AppClient(
var registered = false
var activeMasterUrl: String = null

class ClientActor extends Actor with Logging {
class ClientActor extends Actor with ActorLogReceive with Logging {
var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
Expand Down Expand Up @@ -119,7 +119,7 @@ private[spark] class AppClient(
.contains(remoteUrl.hostPort)
}

override def receive = {
override def receiveWithLogging = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
registered = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}

private[spark] class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {

import context.dispatcher // to use Akka's scheduler.schedule()

Expand Down Expand Up @@ -167,7 +167,7 @@ private[spark] class Master(
context.stop(leaderElectionAgent)
}

override def receive = {
override def receiveWithLogging = {
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}

/**
* @param masterUrls Each url should look like spark://host:port.
Expand All @@ -51,7 +51,7 @@ private[spark] class Worker(
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {
import context.dispatcher

Utils.checkHost(host, "Expected hostname")
Expand Down Expand Up @@ -187,7 +187,7 @@ private[spark] class Worker(
}
}

override def receive = {
override def receiveWithLogging = {
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl)
registered = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, Di

import org.apache.spark.Logging
import org.apache.spark.deploy.DeployMessages.SendHeartbeat
import org.apache.spark.util.ActorLogReceive

/**
* Actor which connects to a worker process and terminates the JVM if the connection is severed.
* Provides fate sharing between a worker and its associated child processes.
*/
private[spark] class WorkerWatcher(workerUrl: String) extends Actor
with Logging {
private[spark] class WorkerWatcher(workerUrl: String)
extends Actor with ActorLogReceive with Logging {

override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

Expand All @@ -48,7 +50,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor

def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)

override def receive = {
override def receiveWithLogging = {
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}

private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int,
sparkProperties: Seq[(String, String)]) extends Actor with ExecutorBackend with Logging {
sparkProperties: Seq[(String, String)])
extends Actor with ActorLogReceive with ExecutorBackend with Logging {

Utils.checkHostPort(hostPort, "Expected hostport")

Expand All @@ -52,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}

override def receive = {
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.ui.JettyUtils

/**
Expand Down Expand Up @@ -61,7 +61,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {

override protected def log = CoarseGrainedSchedulerBackend.this.log

private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
Expand All @@ -79,7 +82,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}

def receive = {
def receiveWithLogging = {
case RegisterExecutor(executorId, hostPort, cores) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorActor.contains(executorId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import akka.actor.{Actor, ActorRef, Props}

import org.apache.spark.{Logging, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{TaskMetrics, Executor, ExecutorBackend}
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.ActorLogReceive

private case class ReviveOffers()

Expand All @@ -43,7 +43,7 @@ private case class StopExecutor()
private[spark] class LocalActor(
scheduler: TaskSchedulerImpl,
executorBackend: LocalBackend,
private val totalCores: Int) extends Actor with Logging {
private val totalCores: Int) extends Actor with ActorLogReceive with Logging {

private var freeCores = totalCores

Expand All @@ -53,7 +53,7 @@ private[spark] class LocalActor(
val executor = new Executor(
localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)

def receive = {
override def receiveWithLogging = {
case ReviveOffers =>
reviveOffers()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}

/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
* all slaves' block managers.
*/
private[spark]
class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus: LiveListenerBus)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {

// Mapping from block manager id to the block manager's information.
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
Expand All @@ -55,8 +55,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000))

val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs",
60000)
val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)

var timeoutCheckingTask: Cancellable = null

Expand All @@ -67,9 +66,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
super.preStart()
}

def receive = {
override def receiveWithLogging = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
logInfo("received a register")
register(blockManagerId, maxMemSize, slaveActor)
sender ! true

Expand Down Expand Up @@ -118,7 +116,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
sender ! true

case StopBlockManagerMaster =>
logInfo("Stopping BlockManagerMaster")
sender ! true
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.actor.{ActorRef, Actor}

import org.apache.spark.{Logging, MapOutputTracker}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.ActorLogReceive

/**
* An actor to take commands from the master to execute options. For example,
Expand All @@ -32,12 +33,12 @@ private[storage]
class BlockManagerSlaveActor(
blockManager: BlockManager,
mapOutputTracker: MapOutputTracker)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {

import context.dispatcher

// Operations that involve removing blocks may be slow and should be done asynchronously
override def receive = {
override def receiveWithLogging = {
case RemoveBlock(blockId) =>
doAsync[Boolean]("removing block " + blockId, sender) {
blockManager.removeBlock(blockId)
Expand Down
64 changes: 64 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import akka.actor.Actor
import org.slf4j.Logger

/**
* A trait to enable logging all Akka actor messages. Here's an example of using this:
*
* {{{
* class BlockManagerMasterActor extends Actor with ActorLogReceive with Logging {
* ...
* override def receiveWithLogging = {
* case GetLocations(blockId) =>
* sender ! getLocations(blockId)
* ...
* }
* ...
* }
* }}}
*
*/
private[spark] trait ActorLogReceive {
self: Actor =>

override def receive: Actor.Receive = new Actor.Receive {

private val _receiveWithLogging = receiveWithLogging

override def isDefinedAt(o: Any): Boolean = _receiveWithLogging.isDefinedAt(o)

override def apply(o: Any): Unit = {
if (log.isDebugEnabled) {
log.debug(s"[actor] received message $o from ${self.sender}")
}
val start = System.nanoTime
_receiveWithLogging.apply(o)
val timeTaken = (System.nanoTime - start).toDouble / 1000000
if (log.isDebugEnabled) {
log.debug(s"[actor] handled message ($timeTaken ms) $o from ${self.sender}")
}
}
}

def receiveWithLogging: Actor.Receive

protected def log: Logger
}

0 comments on commit 3733866

Please sign in to comment.