-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37867][SQL] Compile aggregate functions of build-in JDBC dialect #35166
Changes from 11 commits
9bb754e
fcf6e88
d342f86
d71b293
b9e648d
e626cd8
fc5f712
8ea6bbd
7aa569a
9c4ff55
7dc4597
c23e8e4
4dd1379
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.spark.sql.jdbc.v2 | ||
|
||
import java.sql.Connection | ||
import java.util.Locale | ||
|
||
import org.scalatest.time.SpanSugar._ | ||
|
||
|
@@ -38,6 +39,7 @@ import org.apache.spark.tags.DockerTest | |
@DockerTest | ||
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { | ||
override val catalogName: String = "db2" | ||
override val namespaceOpt: Option[String] = Some("DB2INST1") | ||
override val db = new DatabaseOnDocker { | ||
override val imageName = sys.env.getOrElse("DB2_DOCKER_IMAGE_NAME", "ibmcom/db2:11.5.6.0a") | ||
override val env = Map( | ||
|
@@ -59,8 +61,19 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { | |
override def sparkConf: SparkConf = super.sparkConf | ||
.set("spark.sql.catalog.db2", classOf[JDBCTableCatalog].getName) | ||
.set("spark.sql.catalog.db2.url", db.getJdbcUrl(dockerIp, externalPort)) | ||
.set("spark.sql.catalog.db2.pushDownAggregate", "true") | ||
|
||
override def dataPreparation(conn: Connection): Unit = {} | ||
override def dataPreparation(conn: Connection): Unit = { | ||
conn.prepareStatement( | ||
"CREATE TABLE employee (dept INTEGER, name CLOB, salary DECIMAL(20, 2), bonus DOUBLE)") | ||
.executeUpdate() | ||
conn.prepareStatement( | ||
""" | ||
|INSERT INTO employee VALUES | ||
|(1, 'amy', 10000, 1000), (2, 'alex', 12000, 1200), (1, 'cathy', 9000, 1200), | ||
|(2, 'david', 10000, 1300), (6, 'jen', 12000, 1200) | ||
|""".stripMargin).executeUpdate() | ||
} | ||
|
||
override def testUpdateColumnType(tbl: String): Unit = { | ||
sql(s"CREATE TABLE $tbl (ID INTEGER)") | ||
|
@@ -86,4 +99,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { | |
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) | ||
assert(t.schema === expectedSchema) | ||
} | ||
|
||
override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think DB2 objects names are not case sensitive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tests many time. If uses the table name with lower case, DB2 throws Exception about not find table. |
||
|
||
testVarPop() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.spark.sql.jdbc.v2 | ||
|
||
import java.sql.Connection | ||
import java.util.Locale | ||
|
||
import org.scalatest.time.SpanSugar._ | ||
|
||
|
@@ -56,6 +57,7 @@ import org.apache.spark.tags.DockerTest | |
@DockerTest | ||
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { | ||
override val catalogName: String = "oracle" | ||
override val namespaceOpt: Option[String] = Some("SYSTEM") | ||
override val db = new DatabaseOnDocker { | ||
lazy override val imageName = | ||
sys.env.getOrElse("ORACLE_DOCKER_IMAGE_NAME", "gvenzl/oracle-xe:18.4.0") | ||
|
@@ -73,9 +75,25 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest | |
override def sparkConf: SparkConf = super.sparkConf | ||
.set("spark.sql.catalog.oracle", classOf[JDBCTableCatalog].getName) | ||
.set("spark.sql.catalog.oracle.url", db.getJdbcUrl(dockerIp, externalPort)) | ||
.set("spark.sql.catalog.oracle.pushDownAggregate", "true") | ||
|
||
override val connectionTimeout = timeout(7.minutes) | ||
override def dataPreparation(conn: Connection): Unit = {} | ||
|
||
override def dataPreparation(conn: Connection): Unit = { | ||
conn.prepareStatement( | ||
"CREATE TABLE employee (dept NUMBER(32), name VARCHAR2(32), salary NUMBER(20, 2)," + | ||
" bonus BINARY_DOUBLE)").executeUpdate() | ||
conn.prepareStatement("INSERT INTO employee VALUES (1, 'amy', 10000, 1000)") | ||
.executeUpdate() | ||
conn.prepareStatement("INSERT INTO employee VALUES (2, 'alex', 12000, 1200)") | ||
.executeUpdate() | ||
conn.prepareStatement("INSERT INTO employee VALUES (1, 'cathy', 9000, 1200)") | ||
.executeUpdate() | ||
conn.prepareStatement("INSERT INTO employee VALUES (2, 'david', 10000, 1300)") | ||
.executeUpdate() | ||
conn.prepareStatement("INSERT INTO employee VALUES (6, 'jen', 12000, 1200)") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like every database can create table then insert values with multiple There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First, each database supports different data types? There is no common CREATE TABLE statement. |
||
.executeUpdate() | ||
} | ||
|
||
override def testUpdateColumnType(tbl: String): Unit = { | ||
sql(s"CREATE TABLE $tbl (ID INTEGER)") | ||
|
@@ -93,4 +111,14 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest | |
assert(msg1.contains( | ||
s"Cannot update $catalogName.alt_table field ID: string cannot be cast to int")) | ||
} | ||
|
||
override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT) | ||
|
||
testVarPop() | ||
testVarSamp() | ||
testStddevPop() | ||
testStddevSamp() | ||
testCovarPop() | ||
testCovarSamp() | ||
testCorr() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use
VARCHAR(n)
instead ofCLOB
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated