Skip to content

Commit

Permalink
File download now works for individual attempts and the entire applic…
Browse files Browse the repository at this point in the history
…ation.
  • Loading branch information
harishreedharan committed May 22, 2015
1 parent 350d7e8 commit 0fc1424
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.deploy.history

import java.io.File
import java.io.{OutputStream, File}

import org.apache.hadoop.fs.Path

import org.apache.spark.ui.SparkUI

Expand Down Expand Up @@ -65,9 +67,8 @@ private[history] abstract class ApplicationHistoryProvider {
def getConfig(): Map[String, String] = Map()

/**
* Get the event logs for the given application. The event logs are compressed into a zip file
* and copied into the directory passed in.
* Get the [[Path]]s to the Event log directories.
*/
def copyApplicationEventLogs(appId: String, directory: File): Unit = { }
def getEventLogPaths(appId: String, attemptId: String): Seq[Path] = Seq.empty

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.spark.deploy.history

import java.io.{FileOutputStream, File, BufferedInputStream, FileNotFoundException,
IOException, InputStream}
import java.io.{OutputStream, FileOutputStream, File, BufferedInputStream, FileNotFoundException, IOException, InputStream}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.{FSDataInputStream, FileStatus, Path}
import org.apache.hadoop.fs.permission.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf}
Expand Down Expand Up @@ -221,44 +221,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def copyApplicationEventLogs(appId: String, directory: File): Unit = {
val buffer = new Array[Byte](64 * 1024)
/**
* Copy the data from the path specified into a new [[ZipEntry]] with the remotePath's name.
*/
def copyToZipStream(remotePath: Path, zipStream: ZipOutputStream): Unit = {
val inputStream = fs.open(remotePath, 1 * 1024 * 1024) // 1MB Buffer
zipStream.putNextEntry(new ZipEntry(remotePath.getName))
var dataRemaining = true
while (dataRemaining) {
val length = inputStream.read(buffer)
if (length != -1) {
zipStream.write(buffer, 0, length)
} else {
dataRemaining = false
}
}
zipStream.closeEntry()
inputStream.close()
}
override def getEventLogPaths(
appId: String,
attemptId: String): Seq[Path] = {

var filePaths = new ArrayBuffer[Path]()

applications.get(appId).foreach { appInfo =>
val outFile = new File(directory, s"eventLogs-$appId.zip")
val zipStream = new ZipOutputStream(new FileOutputStream(outFile))
appInfo.attempts.foreach { attempt =>
appInfo.attempts.find { attempt =>
if (attempt.attemptId.isDefined && attempt.attemptId.get == attemptId) true
else false
}.foreach { attempt =>
val remotePath = new Path(logDir, attempt.logPath)
if (isLegacyLogDirectory(fs.getFileStatus(remotePath))) {
val filesIter = fs.listFiles(remotePath, true)
while (filesIter.hasNext) {
copyToZipStream(filesIter.next().getPath, zipStream)
filePaths += filesIter.next().getPath
}
} else {
copyToZipStream(remotePath, zipStream)
filePaths += remotePath
}
}
zipStream.finish()
zipStream.close()
}
filePaths
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.deploy.history

import java.io.File
import java.util.NoSuchElementException
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import com.google.common.cache._
import org.apache.hadoop.fs.Path
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
Expand Down Expand Up @@ -174,9 +174,10 @@ class HistoryServer(
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}

def copyEventLogsToDirectory(appId: String, destDir: File): Boolean = {
provider.copyApplicationEventLogs(appId, destDir)
true
def getEventLogPaths(
appId: String,
attemptId: String): Seq[Path] = {
provider.getEventLogPaths(appId, attemptId)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
}
}

@Path("applications/{appId}/download")
def getEventLogs(
@PathParam("appId") appId: String): EventLogDownloadResource = {
new EventLogDownloadResource(uiRoot, appId, None)
}

@Path("applications/{appId}/{attemptId}/download")
def getEventLogs(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
new EventLogDownloadResource(uiRoot, appId, Some(attemptId))
}
}

private[spark] object ApiRootResource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,70 @@ import java.io.{BufferedInputStream, FileInputStream, OutputStream}
import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.{Response, StreamingOutput, MediaType}

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.util.Utils

@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
private[v1] class EventLogDownloadResource(val uIRoot: UIRoot, val appId: String) {
private[v1] class EventLogDownloadResource(
val uIRoot: UIRoot,
val appId: String,
val attemptId: Option[String]) extends Logging {
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf)

@GET
def getEventLogs(): Response = {
uIRoot match {
case hs: HistoryServer =>
val dir = Utils.createTempDir()
Utils.chmod700(dir)
hs.copyEventLogsToDirectory(appId, dir)
dir.listFiles().headOption.foreach { file =>
val stream = new StreamingOutput {
override def write(output: OutputStream): Unit = {
try {
val inStream = new BufferedInputStream(new FileInputStream(file))
val buffer = new Array[Byte](1024 * 1024)
var dataRemains = true
while (dataRemains) {
val read = inStream.read(buffer)
if (read > 0) {
output.write(buffer, 0, read)
} else {
dataRemains = false
}
var logsNotFound = false
val fileName: String = {
attemptId match {
case Some(id) => s"eventLogs-$appId-$id.zip"
case None => s"eventLogs-$appId.zip"
}
}
val stream = new StreamingOutput {
override def write(output: OutputStream): Unit = {
attemptId match {
case Some(id) =>
Utils.zipFilesToStream(hs.getEventLogPaths(appId, id), conf, output)
case None =>
val appInfo = hs.getApplicationInfoList.find(_.id == appId)
appInfo match {
case Some(info) =>
val attempts = info.attempts
val files = new ArrayBuffer[Path]
attempts.foreach { attempt =>
attempt.attemptId.foreach { attemptId =>
logInfo(s"Attempt found: ${attemptId}")
files ++= hs.getEventLogPaths(appId, attemptId)
}
}
if (files.nonEmpty) {
Utils.zipFilesToStream(files, conf, output)
}
case None => logsNotFound = true
}
output.flush()
} finally {
Utils.deleteRecursively(dir)
}
}
output.flush()
}
return Response.ok(stream)
.header("Content-Length", file.length().toString)
.header("Content-Disposition", s"attachment; filename=${file.getName}")
}
if (logsNotFound) {
Response.serverError()
.entity(s"Event logs are not available for app: $appId.")
.status(Response.Status.SERVICE_UNAVAILABLE)
.build()
} else {
Response.ok(stream)
.header("Content-Disposition", s"attachment; filename=$fileName")
.header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
.build()
}
Response.serverError()
.entity(s"Event logs for $appId not found.")
.status(Response.Status.NOT_FOUND)
.build()
case _ =>
Response.serverError()
.entity("History Server is not running - cannot return event logs.")
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.zip.{ZipOutputStream, ZipEntry}
import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
Expand Down Expand Up @@ -776,6 +777,42 @@ private[spark] object Utils extends Logging {
localRootDirs = null
}

/**
* This method compresses the files passed in, and writes the compressed data out into the
* [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being
* the name of the file being compressed.
*/
private[spark] def zipFilesToStream(
files: Seq[Path],
hadoopConf: Configuration,
outputStream: OutputStream): Unit = {

// Passing in an output stream actually makes this more efficient since we don't have to
// create an additional file to which the compressed data is written which has to be read
// again by the reader, especially if the data needs to be sent over the wire via an
// OutputStream - in which case the destination output stream can be directly passed in here.

val fs = FileSystem.get(hadoopConf)
val buffer = new Array[Byte](64 * 1024)
val zipStream = new ZipOutputStream(outputStream)
files.foreach { remotePath =>
val inputStream = fs.open(remotePath, 1 * 1024 * 1024) // 1MB Buffer
zipStream.putNextEntry(new ZipEntry(remotePath.getName))
var dataRemaining = true
while (dataRemaining) {
val length = inputStream.read(buffer)
if (length != -1) {
zipStream.write(buffer, 0, length)
} else {
dataRemaining = false
}
}
zipStream.closeEntry()
inputStream.close()
}
zipStream.close()
}

/**
* Shuffle the elements of a collection into a random order, returning the
* result in a new collection. Unlike scala.util.Random.shuffle, this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.history

import java.io.{FileInputStream, BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.io.{ByteArrayOutputStream, FileInputStream, BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.net.URI
import java.util.concurrent.TimeUnit
import java.util.zip.ZipInputStream
Expand Down Expand Up @@ -382,39 +382,15 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
.currentTimeMillis() - 400, "test", Some("attempt1")),
SparkListenerApplicationEnd(System.currentTimeMillis() - 200)
)
val log1Buffer = getFileContent(log1)
val log2 = newLogFile("downloadApp1", Some("attempt2"), inProgress = false)
writeFile(log2, true, None,
SparkListenerApplicationStart("downloadApp1", Some("downloadApp1"), System
.currentTimeMillis() - 100, "test", Some("attempt2")),
SparkListenerApplicationEnd(System.currentTimeMillis())
)
val log2Buffer = getFileContent(log2)
provider.checkForLogs()
var inputDir: File = null
var outputDir: File = null
try {
inputDir = Utils.createTempDir()
Utils.chmod700(inputDir)
outputDir = Utils.createTempDir()
Utils.chmod700(outputDir)
provider.copyApplicationEventLogs("downloadApp1", inputDir)
val zipFile = inputDir.listFiles.headOption
zipFile.foreach { file =>
unzipToDir(file, outputDir)
}
var filesCompared = 0
outputDir.listFiles().foreach { outputFile =>
val bufferToCompare = if (outputFile.getName == log1.getName) log1Buffer else log2Buffer
val result = getFileContent(outputFile)
result should equal (bufferToCompare)
filesCompared += 1
}
assert(filesCompared === 2)
} finally {
if (inputDir != null) Utils.deleteRecursively(inputDir)
if (outputDir != null) Utils.deleteRecursively(outputDir)
}
provider.getEventLogPaths("downloadApp1", "attempt1").head.getName should be (log1.getName)
provider.getEventLogPaths("downloadApp1", "attempt2").head.getName should be (log2.getName)
}

/**
Expand Down
Loading

0 comments on commit 0fc1424

Please sign in to comment.