From 157fb0e8df7b87579ca64a2d3a64212675baf644 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 10 Jan 2024 21:16:10 -0800 Subject: [PATCH] Testing insert overwrite with select, need to revert later --- .github/workflows/bot.yml | 50 ++++++------- .../spark/sql/hudi/TestInsertTable.scala | 71 +++++++++++++++++++ 2 files changed, 96 insertions(+), 25 deletions(-) diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 67c7ac16eaa24..d2f536216685a 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -56,29 +56,29 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.11" - sparkProfile: "spark2.4" - sparkModules: "hudi-spark-datasource/hudi-spark2" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.0" - sparkModules: "hudi-spark-datasource/hudi-spark3.0.x" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.1" - sparkModules: "hudi-spark-datasource/hudi-spark3.1.x" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.2" - sparkModules: "hudi-spark-datasource/hudi-spark3.2.x" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.3" - sparkModules: "hudi-spark-datasource/hudi-spark3.3.x" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.4" - sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" +# - scalaProfile: "scala-2.11" +# sparkProfile: "spark2.4" +# sparkModules: "hudi-spark-datasource/hudi-spark2" +# +# - scalaProfile: "scala-2.12" +# sparkProfile: "spark3.0" +# sparkModules: "hudi-spark-datasource/hudi-spark3.0.x" +# +# - scalaProfile: "scala-2.12" +# sparkProfile: "spark3.1" +# sparkModules: "hudi-spark-datasource/hudi-spark3.1.x" +# +# - scalaProfile: "scala-2.12" +# sparkProfile: "spark3.2" +# sparkModules: "hudi-spark-datasource/hudi-spark3.2.x" +# +# - scalaProfile: "scala-2.12" +# sparkProfile: "spark3.3" +# sparkModules: "hudi-spark-datasource/hudi-spark3.3.x" +# +# - scalaProfile: "scala-2.12" +# sparkProfile: "spark3.4" +# sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" - scalaProfile: "scala-2.12" sparkProfile: "spark3.5" @@ -112,7 +112,7 @@ jobs: SPARK_MODULES: ${{ matrix.sparkModules }} if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI run: - mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS + mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "hudi-spark-datasource/hudi-spark" $MVN_ARGS - name: FT - Spark env: SCALA_PROFILE: ${{ matrix.scalaProfile }} @@ -120,7 +120,7 @@ jobs: SPARK_MODULES: ${{ matrix.sparkModules }} if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI run: - mvn test -Pfunctional-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS + mvn test -Pfunctional-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "hudi-spark-datasource/hudi-spark" $MVN_ARGS test-hudi-hadoop-mr-and-hudi-java-client: runs-on: ubuntu-latest diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 044b6451cdfc7..bfbff5203d32c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -2334,6 +2334,77 @@ class TestInsertTable extends HoodieSparkSqlTestBase { }) } + test("yxchang test") { + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + val tableType = "mor" + val t1 = tableName + "_t1" + spark.sql( + s""" + |create table $t1 ( + | id bigint, + | name string, + | price double, + | ts bigint + |) using hudi + |options ( + | type = '${tableType}', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |location '${tmp.getCanonicalPath}/${t1}' + |""".stripMargin) + + spark.sql(s""" insert into ${t1} values(1, 'a1', 10.0, 1000) """) + spark.sql(s""" insert into ${t1} values(2, 'a2', 11.0, 1000) """) + println("yxchang: basic insert done") + + val t1_p = tableName + "_t1_p" + // Test with partition + spark.sql( + s""" + |create table ${t1_p}( + | id bigint, + | name string, + | price double, + | ts bigint, + | dt string + |) using hudi + |partitioned by (dt) + |options ( + | type = '${tableType}', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |location '${tablePath}/${t1_p}' + |""".stripMargin) + + println("yxchang: Created " + t1_p) + + // Insert into partition + spark.sql( + s""" + |insert into ${t1_p} + |partition(dt = '2021-05-07') + |select * from ${t1} """ // use values from non part table + .stripMargin + ) + + println("yxchang: Inserted into partiton for table: " + t1_p) + + // Test Insert OverWrite + spark.sql( + s""" + |insert overwrite table ${t1_p} + |partition(dt = '2021-05-07') + |select * from ${t1} + |limit 10 """ + .stripMargin + ) + }) + } + def ingestAndValidateDataDupPolicy(tableType: String, tableName: String, tmp: File, expectedOperationtype: WriteOperationType = WriteOperationType.INSERT, setOptions: List[String] = List.empty,