Skip to content

Commit

Permalink
Reintroduced insertStatement method
Browse files Browse the repository at this point in the history
  • Loading branch information
CK50 committed Dec 22, 2015
1 parent a59c1aa commit c130e31
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ object JdbcUtils extends Logging {
conn.createStatement.executeUpdate(s"DROP TABLE $table")
}

/**
* Returns a PreparedStatement that inserts a row into table via conn.
*/
def insertStatement(conn: Connection,
table: String,
rddSchema: StructType,
dialect: JdbcDialect): PreparedStatement = {
val sql = dialect.getInsertStatement(table, rddSchema)
conn.prepareStatement(sql)
}

/**
* Retrieve standard jdbc types.
* @param dt The datatype (e.g. [[org.apache.spark.sql.types.StringType]])
Expand Down Expand Up @@ -125,8 +136,7 @@ object JdbcUtils extends Logging {
if (supportsTransactions) {
conn.setAutoCommit(false) // Everything in the same db transaction.
}
val sql = dialect.getInsertStatement(table, rddSchema)
val stmt = conn.prepareStatement(sql)
val stmt = insertStatement(conn, table, rddSchema, dialect)
try {
var rowCount = 0
while (iterator.hasNext) {
Expand Down

0 comments on commit c130e31

Please sign in to comment.