Skip to content

Commit

Permalink
Simplify tests, use Guava stream copy methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Jun 3, 2015
1 parent d8ddede commit 6e8156e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.mutable

import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.permission.AccessControlException
Expand Down Expand Up @@ -233,19 +234,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
val fs = FileSystem.get(hadoopConf)
val buffer = new Array[Byte](64 * 1024)
val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
try {
outputStream.putNextEntry(new ZipEntry(entryName))
var dataRemaining = true
while (dataRemaining) {
val length = inputStream.read(buffer)
if (length != -1) {
outputStream.write(buffer, 0, length)
} else {
dataRemaining = false
}
}
ByteStreams.copy(inputStream, outputStream)
outputStream.closeEntry()
} finally {
inputStream.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.deploy.history

import java.io.{BufferedOutputStream, FileInputStream, File, FileOutputStream, OutputStreamWriter}
import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File,
FileOutputStream, OutputStreamWriter}
import java.net.URI
import java.util.concurrent.TimeUnit
import java.util.zip.ZipOutputStream
import java.util.zip.{ZipInputStream, ZipOutputStream}

import scala.io.Source

import org.apache.commons.io.IOUtils
import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
Expand Down Expand Up @@ -351,36 +353,23 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
provider.checkForLogs()

(1 to 2).foreach { i =>
val outDir = Utils.createTempDir()
val unzipDir = Utils.createTempDir()
try {
Utils.chmod700(outDir)
Utils.chmod700(unzipDir)
val outFile = new File(outDir, s"file$i.zip")
val outputStream = new ZipOutputStream(new FileOutputStream(outFile))
provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
HistoryTestUtils.unzipToDir(new FileInputStream(outFile), unzipDir)
val actualFiles = unzipDir.listFiles()
assert(actualFiles.length == 1)
actualFiles.foreach { actualFile =>
val expFile = logs.find(_.getName == actualFile.getName).get
val expStream = new FileInputStream(expFile)
val resultStream = new FileInputStream(actualFile)
try {
val input = IOUtils.toString(expStream)
val out = IOUtils.toString(resultStream)
out should be(input)
} finally {
Seq(expStream, resultStream).foreach { s =>
Utils.tryWithSafeFinally(s.close())()
}
}
}
} finally {
Seq(outDir, unzipDir).foreach { f =>
Utils.tryWithSafeFinally(Utils.deleteRecursively(f))()
}
val underlyingStream = new ByteArrayOutputStream()
val outputStream = new ZipOutputStream(underlyingStream)
provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
outputStream.close()
val inputStream = new ZipInputStream(new ByteArrayInputStream(underlyingStream.toByteArray))
var totalEntries = 0
var entry = inputStream.getNextEntry
entry should not be null
while (entry != null) {
val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8)
val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8)
actual should be (expected)
totalEntries += 1
entry = inputStream.getNextEntry
}
totalEntries should be (1)
inputStream.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ package org.apache.spark.deploy.history

import java.io.{File, FileInputStream, FileWriter, InputStream, IOException}
import java.net.{HttpURLConnection, URL}
import java.util.zip.ZipInputStream
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.mockito.Mockito.when
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.mock.MockitoSugar

import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

/**
* A collection of tests against the historyserver, including comparing responses from the json
Expand Down Expand Up @@ -169,18 +171,6 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with
// Test that the files are downloaded correctly, and validate them.
def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = {

def validateFile(expStream: FileInputStream, actualStream: FileInputStream): Unit = {
try {
val expected = IOUtils.toString(expStream)
val actual = IOUtils.toString(actualStream)
actual should be(expected)
} finally {
Seq(expStream, actualStream).foreach { s =>
Utils.tryWithSafeFinally(s.close())()
}
}
}

val url = attemptId match {
case Some(id) =>
new URL(s"${generateURL(s"applications/$appId")}/$id/logs")
Expand All @@ -193,39 +183,35 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with
inputStream should not be None
error should be (None)

val dir = Utils.createTempDir()
try {
Utils.chmod700(dir)
HistoryTestUtils.unzipToDir(inputStream.get, dir)
val unzippedContent = dir.listFiles()
attemptId match {
case Some(_) => unzippedContent.length should be (1)
case None => unzippedContent.length should be (2)
}

// If these are legacy files, then each of the unzipped contents is actually a legacy log dir.
val zipStream = new ZipInputStream(inputStream.get)
var entry = zipStream.getNextEntry
entry should not be null
val totalFiles = {
if (legacy) {
unzippedContent.foreach { legacyDir =>
assert(legacyDir.isDirectory)
val logFiles = legacyDir.listFiles()
logFiles.length should be (3)
logFiles.foreach { f =>
val actualStream = new FileInputStream(f)
val expectedStream =
new FileInputStream(new File(new File(logDir, legacyDir.getName), f.getName))
validateFile(expectedStream, actualStream)
}
}
attemptId.map { x => 3 }.getOrElse(6)
} else {
unzippedContent.foreach { f =>
val actualStream = new FileInputStream(f)
val expectedStream = new FileInputStream(new File(logDir, f.getName))
validateFile(expectedStream, actualStream)
attemptId.map { x => 1 }.getOrElse(2)
}
}
var filesCompared = 0
while(entry != null) {
if (!entry.isDirectory) {
val expectedFile = {
if (legacy) {
val splits = entry.getName.split("/")
new File(new File(logDir, splits(0)), splits(1))
} else {
new File(logDir, entry.getName)
}
}
val expected = Files.toString(expectedFile, Charsets.UTF_8)
val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8)
actual should be (expected)
filesCompared += 1
}
} finally {
Utils.deleteRecursively(dir)
entry = zipStream.getNextEntry
}
filesCompared should be (totalFiles)
}

test("response codes on bad paths") {
Expand Down

0 comments on commit 6e8156e

Please sign in to comment.