Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7862] [SQL] Disable the error message redirect to stderr #6882

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2333,3 +2333,36 @@ private[spark] class RedirectThread(
}
}
}

/**
* An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it
* in a circular buffer. The current contents of the buffer can be accessed using
* the toString method.
*/
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](sizeInBytes)

def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.length
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while (line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
8 changes: 8 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(!Utils.isInDirectory(nullFile, parentDir))
assert(!Utils.isInDirectory(nullFile, childFile3))
}

test("circular buffer") {
val buffer = new CircularBuffer(25)
val stream = new java.io.PrintStream(buffer, true, "UTF-8")

stream.println("test circular test circular test circular test circular test circular")
assert(buffer.toString === "t circular test circular\n")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
import javax.annotation.concurrent.GuardedBy

import org.apache.spark.util.CircularBuffer

import scala.collection.JavaConversions._
import scala.language.reflectiveCalls

Expand Down Expand Up @@ -66,32 +68,7 @@ private[hive] class ClientWrapper(
with Logging {

// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](10240)
def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.size
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while(line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
private val outputBuffer = new CircularBuffer()

private val shim = version match {
case hive.v12 => new Shim_v0_12()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.Utils
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}

/**
* Transforms the input by forking and running the specified script.
Expand All @@ -59,15 +59,13 @@ case class ScriptTransformation(
child.execute().mapPartitions { iter =>
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd)
// redirectError(Redirect.INHERIT) would consume the error output from buffer and
// then print it to stderr (inherit the target from the current Scala process).
// If without this there would be 2 issues:
// We need to start threads connected to the process pipeline:
// 1) The error msg generated by the script process would be hidden.
// 2) If the error msg is too big to chock up the buffer, the input logic would be hung
builder.redirectError(Redirect.INHERIT)
val proc = builder.start()
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
val errorStream = proc.getErrorStream
val reader = new BufferedReader(new InputStreamReader(inputStream))

val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output)
Expand Down Expand Up @@ -152,29 +150,43 @@ case class ScriptTransformation(
val dataOutputStream = new DataOutputStream(outputStream)
val outputProjection = new InterpretedProjection(input, child.output)

// TODO make the 2048 configurable?
val stderrBuffer = new CircularBuffer(2048)
// Consume the error stream from the pipeline, otherwise it will be blocked if
// the pipeline is full.
new RedirectThread(errorStream, // input stream from the pipeline
stderrBuffer, // output to a circular buffer
"Thread-ScriptTransformation-STDERR-Consumer").start()

// Put the write(output to the pipeline) into a single thread
// and keep the collector as remain in the main thread.
// otherwise it will causes deadlock if the data size greater than
// the pipeline / buffer capacity.
new Thread(new Runnable() {
override def run(): Unit = {
iter
.map(outputProjection)
.foreach { row =>
if (inputSerde == null) {
val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")

outputStream.write(data)
} else {
val writable = inputSerde.serialize(
row.asInstanceOf[GenericInternalRow].values, inputSoi)
prepareWritable(writable).write(dataOutputStream)
Utils.tryWithSafeFinally {
iter
.map(outputProjection)
.foreach { row =>
if (inputSerde == null) {
val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")

outputStream.write(data)
} else {
val writable = inputSerde.serialize(
row.asInstanceOf[GenericInternalRow].values, inputSoi)
prepareWritable(writable).write(dataOutputStream)
}
}
outputStream.close()
} {
if (proc.waitFor() != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
}
}
outputStream.close()
}
}).start()
}, "Thread-ScriptTransformation-Feed").start()

iterator
}
Expand Down Expand Up @@ -278,3 +290,4 @@ case class HiveScriptIOSchema (
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ class SQLQuerySuite extends QueryTest {
.queryExecution.toRdd.count())
}

ignore("test script transform for stderr") {
test("test script transform for stderr") {
val data = (1 to 100000).map { i => (i, i, i) }
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
assert(0 ===
Expand Down