Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1497. Fix scalastyle warnings in YARN, Hive code #413

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#

echo -e "q\n" | sbt/sbt clean scalastyle > scalastyle.txt
# Check style with YARN alpha built too
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any interest in doing the hive one here too? I edited my comment earlier to show how to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind - these are checked already.

SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt
# Check style with YARN built too
SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt
ERRORS=$(cat scalastyle.txt | grep -e "\<error\>")
if test ! -z "$ERRORS"; then
echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name"))
createCmds.foreach(_())

if (cacheTables)
if (cacheTables) {
cacheTable(name)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.scheduler.SplitInfo
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {

def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
this(args, new Configuration(), sparkConf)

def this(args: ApplicationMasterArguments) = this(args, new SparkConf())

Expand All @@ -63,7 +64,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
// Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
// Send a hello message thus the connection is actually established, thus we can
// monitor Lifecycle Events.
driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
Expand Down Expand Up @@ -104,8 +106,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Allocate all containers
allocateExecutors()

// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
// Launch a progress reporter thread, else app will get killed after expiration
// (def: 10mins) timeout ensure that progress is sent before
// YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.

val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// we want to be reasonably responsive without causing too many requests to RM.
Expand Down Expand Up @@ -163,8 +166,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
// Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
// Users can then monitor stderr/stdout on that node if required.
// Setting this to master host,port - so that the ApplicationReport at client has
// some sensible info. Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
// What do we provide here ? Might make sense to expose something sensible later ?
Expand Down Expand Up @@ -213,7 +216,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(100)
}

Expand All @@ -230,7 +234,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
while (!driverClosed) {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
logInfo("Allocating " + missingExecutorCount +
" containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ private[yarn] class YarnAllocationHandler(
val executorHostname = container.getNodeId.getHost
val containerId = container.getId

assert(
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
assert( container.getResource.getMemory >=
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))

if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
Expand Down Expand Up @@ -393,9 +393,10 @@ private[yarn] class YarnAllocationHandler(

// default.
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
resourceRequests = List(
createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
logDebug("numExecutors: " + numExecutors + ", host preferences: " +
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
}
else {
// request for all hosts in preferred nodes and for numExecutors -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)

val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
System.setProperty(
"spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
}

/** Get the Yarn approved local directories. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
// Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
// Send a hello message thus the connection is actually established,
// thus we can monitor Lifecycle Events.
driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
Expand Down Expand Up @@ -95,8 +96,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Allocate all containers
allocateExecutors()

// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
// Launch a progress reporter thread, else app will get killed after expiration
// (def: 10mins) timeout ensure that progress is sent before
// YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.

val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// we want to be reasonably responsive without causing too many requests to RM.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ private[yarn] class YarnAllocationHandler(
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname))
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
driverUrl, executorHostname))
val executorRunnable = new ExecutorRunnable(
container,
conf,
Expand Down Expand Up @@ -314,8 +315,8 @@ private[yarn] class YarnAllocationHandler(
// `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
} else {
// Decrement the number of executors running. The next iteration of the ApplicationMaster's
// reporting thread will take care of allocating.
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
Expand Down