Skip to content

Commit

Permalink
Use UIRoot directly in ApiRootResource. Also, use Response class to…
Browse files Browse the repository at this point in the history
… set headers.
  • Loading branch information
harishreedharan committed May 21, 2015
1 parent 7b362b2 commit 32b7662
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ class HistoryServer(
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}

def copyEventLogsToDirectory(appId: String, destDir: File): Unit = {
def copyEventLogsToDirectory(appId: String, destDir: File): Boolean = {
provider.copyApplicationEventLogs(appId, destDir)
true
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.status.api.v1

import java.io.File
import javax.servlet.ServletContext
import javax.ws.rs._
import javax.ws.rs.core.{Context, Response}
Expand Down Expand Up @@ -168,11 +169,8 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
def getEventLogs(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new EventLogDownloadResource(ui, appId)
}
new EventLogDownloadResource(uiRoot, appId)
}

}

private[spark] object ApiRootResource {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,59 @@
*/
package org.apache.spark.status.api.v1

import java.io.{BufferedInputStream, FileInputStream, OutputStream, File, InputStream}
import javax.ws.rs.ext.Provider
import java.io.{BufferedInputStream, FileInputStream, OutputStream}
import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.{StreamingOutput, MultivaluedMap, MediaType}
import javax.ws.rs.core.{Response, StreamingOutput, MediaType}

import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.util.Utils


@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
private[v1] class EventLogDownloadResource(val uIRoot: UIRoot, val appId: String) {

private def getErrorOutput(err: String): StreamingOutput = {
new StreamingOutput {
override def write(outputStream: OutputStream): Unit = {
outputStream.write(
s"File download not available for application : $appId due to $err".getBytes("utf-8"))
}
}
}

@GET
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
def getEventLogs(): StreamingOutput = {
def getEventLogs(): Response = {
uIRoot match {
case hs: HistoryServer =>
val dir = Utils.createTempDir()
Utils.chmod700(dir)
hs.copyEventLogsToDirectory(appId, dir)
dir.listFiles().headOption.foreach { file =>
return new StreamingOutput {
val stream = new StreamingOutput {
override def write(output: OutputStream): Unit = {
val inStream = new BufferedInputStream(new FileInputStream(file))
val buffer = new Array[Byte](1024 * 1024)
var dataRemains = true
while (dataRemains) {
val read = inStream.read(buffer)
if (read > 0) {
output.write(buffer, 0, read)
} else {
dataRemains = false
try {
val inStream = new BufferedInputStream(new FileInputStream(file))
val buffer = new Array[Byte](1024 * 1024)
var dataRemains = true
while (dataRemains) {
val read = inStream.read(buffer)
if (read > 0) {
output.write(buffer, 0, read)
} else {
dataRemains = false
}
}
output.flush()
} finally {
Utils.deleteRecursively(dir)
}
output.flush()
}
}
return Response.ok(stream)
.header("Content-Length", file.length().toString)
.header("Content-Disposition", s"attachment; filename=${file.getName}")
.header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
.build()
}
getErrorOutput("No files in dir.")
case _ => getErrorOutput("hs not history server")
Response.serverError()
.entity(s"Event logs for $appId not found.")
.status(Response.Status.NOT_FOUND)
.build()
case _ =>
Response.serverError()
.entity("History Server is not running - cannot return event logs.")
.status(Response.Status.SERVICE_UNAVAILABLE)
.build()
}
}
}
Expand Down

0 comments on commit 32b7662

Please sign in to comment.