Skip to content

Commit

Permalink
Almost working.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed May 20, 2015
1 parent 3d18ebc commit 7b362b2
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status.api.v1

import java.io.OutputStream
import java.lang.annotation.Annotation
import java.lang.reflect.Type
import javax.ws.rs.Produces
import javax.ws.rs.core.{MultivaluedMap, MediaType}
import javax.ws.rs.ext.{Provider, MessageBodyWriter}

class DownloadMessageWriter extends MessageBodyWriter[Object] {

override def writeTo(t: Object, `type`: Class[_], genericType: Type,
annotations: Array[Annotation], mediaType: MediaType,
httpHeaders: MultivaluedMap[String, AnyRef], entityStream: OutputStream): Unit = {
t match {
case downloader @ EventLogDownloadResource(_) =>
downloader.getEventLogs()
}
}

override def getSize(t: Object, `type`: Class[_], genericType: Type,
annotations: Array[Annotation], mediaType: MediaType): Long = {
-1L
}

override def isWriteable(`type`: Class[_], genericType: Type, annotations: Array[Annotation],
mediaType: MediaType): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,54 @@
*/
package org.apache.spark.status.api.v1

import java.io.{FileInputStream, OutputStream, File, InputStream}
import java.io.{BufferedInputStream, FileInputStream, OutputStream, File, InputStream}
import javax.ws.rs.ext.Provider
import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.{MultivaluedMap, MediaType}
import javax.ws.rs.core.{StreamingOutput, MultivaluedMap, MediaType}

import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.util.Utils

@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))

private[v1] class EventLogDownloadResource(val uIRoot: UIRoot, val appId: String) {

private def getErrorOutput(err: String): StreamingOutput = {
new StreamingOutput {
override def write(outputStream: OutputStream): Unit = {
outputStream.write(
s"File download not available for application : $appId due to $err".getBytes("utf-8"))
}
}
}

@GET
def getEventLogs(headers: MultivaluedMap[String, AnyRef], outputStream: OutputStream): Unit = {
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
def getEventLogs(): StreamingOutput = {
uIRoot match {
case hs: HistoryServer =>
val dir = Utils.createTempDir()
Utils.chmod700(dir)
hs.copyEventLogsToDirectory(appId, dir)
dir.listFiles().headOption.foreach { zipFile =>
headers.add("Content-Length", zipFile.length().toString)
headers.add("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
headers.add("Content-Disposition", s"attachment; filename=${zipFile.getName}")
var inputStream: InputStream = null
try {
inputStream = new FileInputStream(zipFile)
val buffer = new Array[Byte](1024 * 1024)
var remaining = true
while (remaining) {
val read = inputStream.read(buffer)
if (read != -1) {
outputStream.write(buffer, 0, read)
} else {
remaining = false
dir.listFiles().headOption.foreach { file =>
return new StreamingOutput {
override def write(output: OutputStream): Unit = {
val inStream = new BufferedInputStream(new FileInputStream(file))
val buffer = new Array[Byte](1024 * 1024)
var dataRemains = true
while (dataRemains) {
val read = inStream.read(buffer)
if (read > 0) {
output.write(buffer, 0, read)
} else {
dataRemains = false
}
}
output.flush()
}
outputStream.flush()
} finally {
inputStream.close()
Utils.deleteRecursively(dir)
}
}
case _ => outputStream.write(
s"File download not available for application : $appId".getBytes("utf-8"))
getErrorOutput("No files in dir.")
case _ => getErrorOutput("hs not history server")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
* Note that jersey automatically discovers this class based on its package and its annotations.
*/
@Provider
@Produces(Array(MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM))
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{

val mapper = new ObjectMapper() {
Expand Down Expand Up @@ -69,8 +69,6 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
outputStream: OutputStream): Unit = {
t match {
case ErrorWrapper(err) => outputStream.write(err.getBytes())
case downloader @ EventLogDownloadResource(_) =>
downloader.getEventLogs(multivaluedMap, outputStream)
case _ => mapper.writeValue(outputStream, t)
}
}
Expand Down

0 comments on commit 7b362b2

Please sign in to comment.