Skip to content

Commit

Permalink
Add more debug message for ManagedBuffer
Browse files Browse the repository at this point in the history
This is to help debug the error reported at http://apache-spark-user-list.1001560.n3.nabble.com/SQL-queries-fail-in-1-2-0-SNAPSHOT-td15327.html

Author: Reynold Xin <[email protected]>

Closes apache#2580 from rxin/buffer-debug and squashes the following commits:

5814292 [Reynold Xin] Logging close() in case close() fails.
323dfec [Reynold Xin] Add more debug message.
  • Loading branch information
rxin authored and aarondav committed Sep 29, 2014
1 parent dab1b0a commit e43c72f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 6 deletions.
43 changes: 37 additions & 6 deletions core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package org.apache.spark.network

import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
import java.io._
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.channels.FileChannel.MapMode

import scala.util.Try

import com.google.common.io.ByteStreams
import io.netty.buffer.{ByteBufInputStream, ByteBuf}

import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.{ByteBufferInputStream, Utils}


/**
Expand Down Expand Up @@ -71,18 +73,47 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
try {
channel = new RandomAccessFile(file, "r").getChannel
channel.map(MapMode.READ_ONLY, offset, length)
} catch {
case e: IOException =>
Try(channel.size).toOption match {
case Some(fileLen) =>
throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
case None =>
throw new IOException(s"Error in opening $this", e)
}
} finally {
if (channel != null) {
channel.close()
Utils.tryLog(channel.close())
}
}
}

override def inputStream(): InputStream = {
val is = new FileInputStream(file)
is.skip(offset)
ByteStreams.limit(is, length)
var is: FileInputStream = null
try {
is = new FileInputStream(file)
is.skip(offset)
ByteStreams.limit(is, length)
} catch {
case e: IOException =>
if (is != null) {
Utils.tryLog(is.close())
}
Try(file.length).toOption match {
case Some(fileLen) =>
throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
case None =>
throw new IOException(s"Error in opening $this", e)
}
case e: Throwable =>
if (is != null) {
Utils.tryLog(is.close())
}
throw e
}
}

override def toString: String = s"${getClass.getName}($file, $offset, $length)"
}


Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,20 @@ private[spark] object Utils extends Logging {
}
}

/** Executes the given block in a Try, logging any uncaught exceptions. */
def tryLog[T](f: => T): Try[T] = {
try {
val res = f
scala.util.Success(res)
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
scala.util.Failure(t)
}
}

/** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
def isFatalError(e: Throwable): Boolean = {
e match {
Expand Down

0 comments on commit e43c72f

Please sign in to comment.