Skip to content

Commit

Permalink
[SPARK-7601] [SQL] Support Insert into JDBC Datasource
Browse files Browse the repository at this point in the history
Supported InsertableRelation for JDBC Datasource JDBCRelation.
Example usage:
sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE testram1
        |USING org.apache.spark.sql.jdbc
        |OPTIONS (url '$url', dbtable 'testram1', user 'xx', password 'xx', driver 'com.h2.Driver')
      """.stripMargin.replaceAll("\n", " "))

sqlContext.sql("insert into table testram1 select * from testsrc")
sqlContext.sql("insert overwrite table testram1 select * from testsrc")

Author: Venkata Ramana Gollamudi <[email protected]>

Closes apache#6121 from gvramana/JDBCDatasource_insert and squashes the following commits:

f3fb5f1 [Venkata Ramana Gollamudi] Support for JDBC Datasource InsertableRelation
  • Loading branch information
gvramana authored and marmbrus committed May 14, 2015
1 parent 73bed40 commit 59aaa1d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -129,7 +130,8 @@ private[sql] case class JDBCRelation(
parts: Array[Partition],
properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
extends BaseRelation
with PrunedFilteredScan {
with PrunedFilteredScan
with InsertableRelation {

override val needConversion: Boolean = false

Expand All @@ -148,4 +150,8 @@ private[sql] case class JDBCRelation(
filters,
parts)
}

override def insert(data: DataFrame, overwrite: Boolean): Unit = {
data.insertIntoJDBC(url, table, overwrite, properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,29 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {

conn1 = DriverManager.getConnection(url1, properties)
conn1.prepareStatement("create schema test").executeUpdate()
conn1.prepareStatement("drop table if exists test.people").executeUpdate()
conn1.prepareStatement(
"create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
conn1.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate()
conn1.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate()
conn1.prepareStatement("drop table if exists test.people1").executeUpdate()
conn1.prepareStatement(
"create table test.people1 (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
conn1.commit()

TestSQLContext.sql(
s"""
|CREATE TEMPORARY TABLE PEOPLE
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url1', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))

TestSQLContext.sql(
s"""
|CREATE TEMPORARY TABLE PEOPLE1
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
}

after {
Expand Down Expand Up @@ -114,5 +137,17 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
}
}


test("INSERT to JDBC Datasource") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}

test("INSERT to JDBC Datasource with overwrite") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
}

0 comments on commit 59aaa1d

Please sign in to comment.