Skip to content

Commit

Permalink
SPARK-2689 Fix potential leak of connection/PreparedStatement in case…
Browse files Browse the repository at this point in the history
… of error in JdbcRDD
  • Loading branch information
Stephen Boesch committed Aug 5, 2014
1 parent 095b2c9 commit 3fb23ed
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rdd

import java.sql.{Connection, ResultSet}
import java.sql.{PreparedStatement, Connection, ResultSet}

import scala.reflect.ClassTag

Expand Down Expand Up @@ -70,20 +70,30 @@ class JdbcRDD[T: ClassTag](
override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
context.addOnCompleteCallback{ () => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
var conn : Connection = _
var stmt : PreparedStatement = _
try {
conn = getConnection()
stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

// setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
// rather than pulling entire resultset into memory.
// see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
stmt.setFetchSize(Integer.MIN_VALUE)
logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
}
// setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
// rather than pulling entire resultset into memory.
// see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
stmt.setFetchSize(Integer.MIN_VALUE)
logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
}

stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()

} catch {
case e: Exception =>
close()
logError("Exception occurred on creating connection/preparedStatement", e)
throw e // Is it correct to throw Exception, or what is preferred cleanup here?
}

override def getNext: T = {
if (rs.next()) {
Expand All @@ -106,7 +116,7 @@ class JdbcRDD[T: ClassTag](
case e: Exception => logWarning("Exception closing statement", e)
}
try {
if (null != conn && ! conn.isClosed()) conn.close()
if (null != conn && ! stmt.isClosed()) conn.close()
logInfo("closed connection")
} catch {
case e: Exception => logWarning("Exception closing connection", e)
Expand All @@ -120,3 +130,4 @@ object JdbcRDD {
Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
}
}

0 comments on commit 3fb23ed

Please sign in to comment.