Skip to content

Commit

Permalink
Ensure that Path does not appear in interfaces, by rafactoring inte…
Browse files Browse the repository at this point in the history
…rfaces.
  • Loading branch information
harishreedharan committed May 28, 2015
1 parent 5a5f3e2 commit 1100b40
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.deploy.history

import org.apache.hadoop.fs.Path
import java.io.OutputStream

import org.apache.spark.SparkException
import org.apache.spark.ui.SparkUI

private[spark] case class ApplicationAttemptInfo(
Expand Down Expand Up @@ -65,11 +66,11 @@ private[history] abstract class ApplicationHistoryProvider {
def getConfig(): Map[String, String] = Map()

/**
* Get the [[Path]]s to the Event log files. For legacy event log directories, directory path
* itself is returned. The caller is responsible for listing the files and using them as needed.
* If the attemptId is [[None]], event logs corresponding to all attempts for the given
* application are downloaded as a single zip file.
* Writes out the event logs to the output stream provided. The logs will be compressed into a
* single zip file and written out.
* @throws SparkException if the logs for the app id cannot be found.
*/
def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = Seq.empty
@throws(classOf[SparkException])
def writeEventLogs(appId: String, attemptId: Option[String], outputStream: OutputStream): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.spark.deploy.history

import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream}
import java.io.{OutputStream, FileOutputStream, File, BufferedInputStream,
FileNotFoundException, IOException, InputStream}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -60,7 +61,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.map { d => Utils.resolveURI(d).toString }
.getOrElse(DEFAULT_LOG_DIR)

private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private val fs = Utils.getHadoopFileSystem(logDir, hadoopConf)

// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
Expand Down Expand Up @@ -220,16 +222,49 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = {
override def writeEventLogs(
appId: String,
attemptId: Option[String],
outputStream: OutputStream): Unit = {

val filePaths = new ArrayBuffer[Path]()
applications.get(appId).foreach { appInfo =>
// If no attempt is specified, or there is no attemptId for attempts, return all attempts
appInfo.attempts.filter { attempt =>
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
}.foreach { attempt => filePaths += new Path(logDir, attempt.logPath) }
applications.get(appId) match {
case Some(appInfo) =>
val dirsToClear = new mutable.ArrayBuffer[File]()
try {
// If no attempt is specified, or there is no attemptId for attempts, return all attempts
val pathsToZip = appInfo.attempts.filter { attempt =>
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
}.map { attempt =>
val logPath = new Path(logDir, attempt.logPath)
if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
val localDir = Utils.createTempDir()
Utils.chmod700(localDir)
dirsToClear += localDir
val outputFile = new File(localDir, logPath.getName)
val outputStream = new FileOutputStream(outputFile)
val files = fs.listFiles(logPath, false)
val paths = new mutable.ArrayBuffer[Path]()
while (files.hasNext) {
paths += files.next().getPath
}
Utils.zipFilesToStream(paths, hadoopConf, outputStream)
new Path(outputFile.toURI)
} else {
new Path(logDir, attempt.logPath)
}
}
Utils.zipFilesToStream(pathsToZip, hadoopConf, outputStream)
} finally {
dirsToClear.foreach { dir =>
try {
Utils.deleteRecursively(dir)
} catch {
case NonFatal(e) => logWarning(s"Error while attempting to delete $dir.")
}
}
}
case None => throw new SparkException(s"Logs for $appId not found.")
}
filePaths
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.deploy.history

import java.io.OutputStream
import java.util.NoSuchElementException
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import com.google.common.cache._
import org.apache.hadoop.fs.Path
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
Expand Down Expand Up @@ -174,8 +174,11 @@ class HistoryServer(
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}

override def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = {
provider.getEventLogPaths(appId, attemptId)
override def writeEventLogs(
appId: String,
attemptId: Option[String],
outputStream: OutputStream): Unit = {
provider.writeEventLogs(appId, attemptId, outputStream)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package org.apache.spark.status.api.v1

import java.io.OutputStream
import javax.servlet.ServletContext
import javax.ws.rs._
import javax.ws.rs.core.{Context, Response}

import com.sun.jersey.api.core.ResourceConfig
import com.sun.jersey.spi.container.servlet.ServletContainer
import org.apache.hadoop.fs
import org.eclipse.jetty.server.handler.ContextHandler
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

Expand Down Expand Up @@ -165,13 +165,13 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
}
}

@Path("applications/{appId}/download")
@Path("applications/{appId}/logs")
def getEventLogs(
@PathParam("appId") appId: String): EventLogDownloadResource = {
new EventLogDownloadResource(uiRoot, appId, None)
}

@Path("applications/{appId}/{attemptId}/download")
@Path("applications/{appId}/{attemptId}/logs")
def getEventLogs(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
Expand Down Expand Up @@ -205,7 +205,10 @@ private[spark] object ApiRootResource {
private[spark] trait UIRoot {
def getSparkUI(appKey: String): Option[SparkUI]
def getApplicationInfoList: Iterator[ApplicationInfo]
def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[fs.Path] = Seq.empty
def writeEventLogs(
appId: String,
attemptId: Option[String],
outputStream: OutputStream): Unit = { }

/**
* Get the spark UI with the given appID, and apply a function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@
*/
package org.apache.spark.status.api.v1

import java.io.{File, FileOutputStream, OutputStream}
import java.io.OutputStream
import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.{MediaType, Response, StreamingOutput}

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import scala.util.control.NonFatal

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.util.Utils

@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
private[v1] class EventLogDownloadResource(
Expand All @@ -40,30 +37,29 @@ private[v1] class EventLogDownloadResource(
def getEventLogs(): Response = {
uIRoot match {
case hs: HistoryServer =>
var logsNotFound = false
val fileName = {
attemptId match {
case Some(id) => s"eventLogs-$appId-$id.zip"
case None => s"eventLogs-$appId.zip"
try {
val fileName = {
attemptId match {
case Some(id) => s"eventLogs-$appId-$id.zip"
case None => s"eventLogs-$appId.zip"
}
}
}
val stream = new StreamingOutput {
override def write(output: OutputStream): Unit = {
val eventLogs = hs.getEventLogPaths(appId, attemptId)
if (eventLogs.isEmpty) logsNotFound = true
else zipLogFiles(eventLogs, output)

val stream = new StreamingOutput {
override def write(output: OutputStream) = hs.writeEventLogs(appId, attemptId, output)
}
}
if (logsNotFound) {
Response.serverError()
.entity(s"Event logs are not available for app: $appId.")
.status(Response.Status.SERVICE_UNAVAILABLE)
.build()
} else {

Response.ok(stream)
.header("Content-Disposition", s"attachment; filename=$fileName")
.header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
.build()

} catch {
case NonFatal(e) =>
Response.serverError()
.entity(s"Event logs are not available for app: $appId.")
.status(Response.Status.SERVICE_UNAVAILABLE)
.build()
}
case _ =>
Response.serverError()
Expand All @@ -72,39 +68,4 @@ private[v1] class EventLogDownloadResource(
.build()
}
}

private def zipLogFiles(eventLogs: Seq[Path], output: OutputStream): Unit = {
val areLegacyLogs = eventLogs.headOption.exists { path =>
path.getFileSystem(conf).isDirectory(path)
}
val pathsToZip = if (areLegacyLogs) {
new ArrayBuffer[Path]()
} else {
eventLogs
}
var tempDir: File = null
try {
if (areLegacyLogs) {
tempDir = Utils.createTempDir()
Utils.chmod700(tempDir)
eventLogs.foreach { logPath =>
// If the event logs are directories (legacy), then create a zip file for each
// one and write each of these files to the eventual output.
val fs = logPath.getFileSystem(conf)
val logFiles = fs.listFiles(logPath, true)
val zipFile = new File(tempDir, logPath.getName + ".zip")
pathsToZip.asInstanceOf[ArrayBuffer[Path]] += new Path(zipFile.toURI)
val outputStream = new FileOutputStream(zipFile)
val paths = new ArrayBuffer[Path]()
while (logFiles.hasNext) {
paths += logFiles.next().getPath
}
Utils.zipFilesToStream(paths, conf, outputStream)
}
}
Utils.zipFilesToStream(pathsToZip, conf, output)
} finally {
if (tempDir != null) Utils.deleteRecursively(tempDir)
}
}
}
6 changes: 0 additions & 6 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -786,12 +786,6 @@ private[spark] object Utils extends Logging {
files: Seq[Path],
hadoopConf: Configuration,
outputStream: OutputStream): Unit = {

// Passing in an output stream actually makes this more efficient since we don't have to
// create an additional file to which the compressed data is written which has to be read
// again by the reader, especially if the data needs to be sent over the wire via an
// OutputStream - in which case the destination output stream can be directly passed in here.

val fs = FileSystem.get(hadoopConf)
val buffer = new Array[Byte](64 * 1024)
val zipStream = Some(new ZipOutputStream(outputStream))
Expand Down
Loading

0 comments on commit 1100b40

Please sign in to comment.