Skip to content

Commit

Permalink
Refactor to make attemptId optional in the API. Also added tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed May 27, 2015
1 parent 0fc1424 commit a48b91f
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.deploy.history

import java.io.{OutputStream, File}

import org.apache.hadoop.fs.Path

import org.apache.spark.ui.SparkUI
Expand Down Expand Up @@ -67,8 +65,11 @@ private[history] abstract class ApplicationHistoryProvider {
def getConfig(): Map[String, String] = Map()

/**
* Get the [[Path]]s to the Event log directories.
* Get the [[Path]]s to the Event log files. For legacy event log directories, directory path
* itself is returned. The caller is responsible for listing the files and using them as needed.
* If the attemptId is [[None]], event logs corresponding to all attempts for the given
* application are downloaded as a single zip file.
*/
def getEventLogPaths(appId: String, attemptId: String): Seq[Path] = Seq.empty
def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = Seq.empty

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.spark.deploy.history

import java.io.{OutputStream, FileOutputStream, File, BufferedInputStream, FileNotFoundException, IOException, InputStream}
import java.io.{File, FileOutputStream, BufferedInputStream, FileNotFoundException, IOException, InputStream}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FSDataInputStream, FileStatus, Path}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf}
Expand Down Expand Up @@ -221,27 +220,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def getEventLogPaths(
appId: String,
attemptId: String): Seq[Path] = {

var filePaths = new ArrayBuffer[Path]()
override def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = {

val filePaths = new ArrayBuffer[Path]()
applications.get(appId).foreach { appInfo =>
appInfo.attempts.find { attempt =>
if (attempt.attemptId.isDefined && attempt.attemptId.get == attemptId) true
else false
}.foreach { attempt =>
val remotePath = new Path(logDir, attempt.logPath)
if (isLegacyLogDirectory(fs.getFileStatus(remotePath))) {
val filesIter = fs.listFiles(remotePath, true)
while (filesIter.hasNext) {
filePaths += filesIter.next().getPath
}
} else {
filePaths += remotePath
}
}
// If no attempt is specified, or there is no attemptId for attempts, return all attempts
appInfo.attempts.filter { attempt =>
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
}.foreach { attempt => filePaths += new Path(logDir, attempt.logPath) }
}
filePaths
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,10 @@ class HistoryServer(
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}

def getEventLogPaths(
appId: String,
attemptId: String): Seq[Path] = {
override def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = {
provider.getEventLogPaths(appId, attemptId)
}


/**
* Returns the provider configuration to show in the listing page.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import javax.ws.rs.core.{Context, Response}

import com.sun.jersey.api.core.ResourceConfig
import com.sun.jersey.spi.container.servlet.ServletContainer
import org.apache.hadoop.fs
import org.eclipse.jetty.server.handler.ContextHandler
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

Expand Down Expand Up @@ -204,6 +205,7 @@ private[spark] object ApiRootResource {
private[spark] trait UIRoot {
def getSparkUI(appKey: String): Option[SparkUI]
def getApplicationInfoList: Iterator[ApplicationInfo]
def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[fs.Path] = Seq.empty

/**
* Get the spark UI with the given appID, and apply a function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.status.api.v1

import java.io.{BufferedInputStream, FileInputStream, OutputStream}
import java.io.{File, FileOutputStream, BufferedInputStream, FileInputStream, OutputStream}
import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.{Response, StreamingOutput, MediaType}

Expand All @@ -41,36 +41,17 @@ private[v1] class EventLogDownloadResource(
uIRoot match {
case hs: HistoryServer =>
var logsNotFound = false
val fileName: String = {
val fileName = {
attemptId match {
case Some(id) => s"eventLogs-$appId-$id.zip"
case None => s"eventLogs-$appId.zip"
}
}
val stream = new StreamingOutput {
override def write(output: OutputStream): Unit = {
attemptId match {
case Some(id) =>
Utils.zipFilesToStream(hs.getEventLogPaths(appId, id), conf, output)
case None =>
val appInfo = hs.getApplicationInfoList.find(_.id == appId)
appInfo match {
case Some(info) =>
val attempts = info.attempts
val files = new ArrayBuffer[Path]
attempts.foreach { attempt =>
attempt.attemptId.foreach { attemptId =>
logInfo(s"Attempt found: ${attemptId}")
files ++= hs.getEventLogPaths(appId, attemptId)
}
}
if (files.nonEmpty) {
Utils.zipFilesToStream(files, conf, output)
}
case None => logsNotFound = true
}
}
output.flush()
val eventLogs = hs.getEventLogPaths(appId, attemptId)
if (eventLogs.isEmpty) logsNotFound = true
else zipLogFiles(eventLogs, output)
}
}
if (logsNotFound) {
Expand All @@ -86,16 +67,44 @@ private[v1] class EventLogDownloadResource(
}
case _ =>
Response.serverError()
.entity("History Server is not running - cannot return event logs.")
.entity("Event logs are only available through the history server.")
.status(Response.Status.SERVICE_UNAVAILABLE)
.build()
}
}
}

private[v1] object EventLogDownloadResource {

def unapply(resource: EventLogDownloadResource): Option[(UIRoot, String)] = {
Some((resource.uIRoot, resource.appId))
private def zipLogFiles(eventLogs: Seq[Path], output: OutputStream): Unit = {
val areLegacyLogs = eventLogs.headOption.exists { path =>
path.getFileSystem(conf).isDirectory(path)
}
val pathsToZip = if (areLegacyLogs) {
new ArrayBuffer[Path]()
} else {
eventLogs
}
var tempDir: File = null
try {
if (areLegacyLogs) {
tempDir = Utils.createTempDir()
Utils.chmod700(tempDir)
eventLogs.foreach { logPath =>
// If the event logs are directories (legacy), then create a zip file for each
// one and write each of these files to the eventual output.
val fs = logPath.getFileSystem(conf)
val logFiles = fs.listFiles(logPath, true)
val zipFile = new File(tempDir, logPath.getName + ".zip")
pathsToZip.asInstanceOf[ArrayBuffer[Path]] += new Path(zipFile.toURI)
val outputStream = new FileOutputStream(zipFile)
val paths = new ArrayBuffer[Path]()
while (logFiles.hasNext) {
paths += logFiles.next().getPath
}
Utils.zipFilesToStream(paths, conf, outputStream)
}
}
Utils.zipFilesToStream(pathsToZip, conf, output)
} finally {
if (tempDir != null) Utils.deleteRecursively(tempDir)
}
}
}
36 changes: 21 additions & 15 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.zip.{ZipOutputStream, ZipEntry}
import java.util.zip.{ZipEntry, ZipOutputStream}
import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
Expand Down Expand Up @@ -794,23 +794,29 @@ private[spark] object Utils extends Logging {

val fs = FileSystem.get(hadoopConf)
val buffer = new Array[Byte](64 * 1024)
val zipStream = new ZipOutputStream(outputStream)
files.foreach { remotePath =>
val inputStream = fs.open(remotePath, 1 * 1024 * 1024) // 1MB Buffer
zipStream.putNextEntry(new ZipEntry(remotePath.getName))
var dataRemaining = true
while (dataRemaining) {
val length = inputStream.read(buffer)
if (length != -1) {
zipStream.write(buffer, 0, length)
} else {
dataRemaining = false
val zipStream = Some(new ZipOutputStream(outputStream))
try {
files.foreach { remotePath =>
val inputStream = Some(fs.open(remotePath, 1 * 1024 * 1024)) // 1MB Buffer
try {
zipStream.get.putNextEntry(new ZipEntry(remotePath.getName))
var dataRemaining = true
while (dataRemaining) {
val length = inputStream.get.read(buffer)
if (length != -1) {
zipStream.get.write(buffer, 0, length)
} else {
dataRemaining = false
}
}
zipStream.get.closeEntry()
} finally {
inputStream.foreach(_.close())
}
}
zipStream.closeEntry()
inputStream.close()
} finally {
zipStream.foreach(_.close())
}
zipStream.close()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1430917381535",
"name" : "Spark shell",
"attempts" : [ {
"attemptId" : "1",
"startTime" : "2015-05-06T13:03:00.893GMT",
"endTime" : "2015-02-03T16:42:46.912GMT",
"sparkUser" : "irashid",
"completed" : true
}, {
"attemptId" : "2",
"startTime" : "2015-05-06T13:03:00.893GMT",
"endTime" : "2015-02-03T16:42:46.912GMT",
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1422981759269",
"name" : "Spark shell",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1430917381535",
"name" : "Spark shell",
"attempts" : [ {
"attemptId" : "1",
"startTime" : "2015-05-06T13:03:00.893GMT",
"endTime" : "2015-02-03T16:42:46.912GMT",
"sparkUser" : "irashid",
"completed" : true
}, {
"attemptId" : "2",
"startTime" : "2015-05-06T13:03:00.893GMT",
"endTime" : "2015-02-03T16:42:46.912GMT",
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1422981759269",
"name" : "Spark shell",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,29 @@
"completed" : true
} ]
}, {
"id" : "local-1425081759269",
"id": "local-1425081759269",
"name": "Spark shell",
"attempts": [
{
"startTime": "2015-02-28T00:02:38.277GMT",
"endTime": "2015-02-28T00:02:46.912GMT",
"sparkUser": "irashid",
"completed": true
}
]
}, {
"id" : "local-1430917381535",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2015-02-28T00:02:38.277GMT",
"endTime" : "2015-02-28T00:02:46.912GMT",
"attemptId" : "1",
"startTime" : "2015-05-06T13:03:00.893GMT",
"endTime" : "2015-02-03T16:42:46.912GMT",
"sparkUser" : "irashid",
"completed" : true
}, {
"attemptId" : "2",
"startTime" : "2015-05-06T13:03:00.893GMT",
"endTime" : "2015-02-03T16:42:46.912GMT",
"sparkUser" : "irashid",
"completed" : true
} ]
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/resources/spark-events/local-1430917381535_1
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917381651}
{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/irashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380893,"User":"irashid","App Attempt ID":"1"}
{"Event":"SparkListenerApplicationEnd","Timestamp":1422981766912}
Loading

0 comments on commit a48b91f

Please sign in to comment.