diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/vectorized/CloseableCHColumnBatchIterator.scala b/backends-clickhouse/src/main/scala/io/glutenproject/vectorized/CloseableCHColumnBatchIterator.scala index 90be33fe43ef..55daa8a46abc 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/vectorized/CloseableCHColumnBatchIterator.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/vectorized/CloseableCHColumnBatchIterator.scala @@ -32,11 +32,15 @@ class CloseableCHColumnBatchIterator(itr: Iterator[ColumnarBatch], pipelineTime: Option[SQLMetric] = None ) extends Iterator[ColumnarBatch] with Logging { var cb: ColumnarBatch = null + var scanTime = 0L override def hasNext: Boolean = { val beforeTime = System.nanoTime() val res = itr.hasNext - pipelineTime.map(t => t += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeTime)) + scanTime += System.nanoTime() - beforeTime + if (!res) { + pipelineTime.foreach(t => t += TimeUnit.NANOSECONDS.toMillis(scanTime)) + } res } @@ -49,7 +53,7 @@ class CloseableCHColumnBatchIterator(itr: Iterator[ColumnarBatch], val beforeTime = System.nanoTime() closeCurrentBatch() cb = itr.next() - pipelineTime.map(t => t += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeTime)) + scanTime += System.nanoTime() - beforeTime cb } diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/benchmarks/DSV2BenchmarkTest.scala b/backends-clickhouse/src/test/scala/io/glutenproject/benchmarks/DSV2BenchmarkTest.scala index 9bcce88d9c5e..67fd3e5a0101 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/benchmarks/DSV2BenchmarkTest.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/benchmarks/DSV2BenchmarkTest.scala @@ -70,18 +70,18 @@ object DSV2BenchmarkTest { val sessionBuilderTmp = SparkSession .builder() - .appName("Gluten-Benchmark") + .appName("Gluten-TPCH-Benchmark") val libPath = "/home/myubuntu/Works/c_cpp_projects/Kyligence-ClickHouse-1/" + "cmake-build-release/utils/local-engine/libch.so" val sessionBuilder = if (!configed) { val sessionBuilderTmp1 = sessionBuilderTmp - .master("local[8]") + .master("local[6]") .config("spark.driver.memory", "30G") .config("spark.driver.memoryOverhead", "10G") .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") .config("spark.default.parallelism", 1) - .config("spark.sql.shuffle.partitions", 8) + .config("spark.sql.shuffle.partitions", 6) .config("spark.sql.adaptive.enabled", "false") .config("spark.sql.files.maxPartitionBytes", 1024 << 10 << 10) // default is 128M .config("spark.sql.files.openCostInBytes", 1024 << 10 << 10) // default is 4M @@ -119,13 +119,14 @@ object DSV2BenchmarkTest { .config("spark.gluten.sql.columnar.iterator", "true") .config("spark.gluten.sql.columnar.hashagg.enablefinal", "true") .config("spark.gluten.sql.enable.native.validation", "false") + .config("spark.gluten.sql.columnar.separate.scan.rdd.for.ch", "true") // .config("spark.gluten.sql.columnar.extension.scan.rdd", "false") // .config("spark.gluten.sql.columnar.sort", "false") // .config("spark.sql.codegen.wholeStage", "false") .config("spark.sql.autoBroadcastJoinThreshold", "10MB") .config("spark.sql.exchange.reuse", "true") .config("spark.gluten.sql.columnar.forceshuffledhashjoin", "true") - .config("spark.gluten.sql.columnar.coalesce.batches", "true") + .config("spark.gluten.sql.columnar.coalesce.batches", "false") // .config("spark.gluten.sql.columnar.filescan", "true") // .config("spark.sql.optimizeNullAwareAntiJoin", "false") // .config("spark.sql.join.preferSortMergeJoin", "false") @@ -198,9 +199,9 @@ object DSV2BenchmarkTest { // createTempView(spark, "/data1/test_output/tpch-data-sf10", "parquet") // createGlobalTempView(spark) // testJoinIssue(spark) - testTPCHOne(spark, executedCnt) + // testTPCHOne(spark, executedCnt) // testSepScanRDD(spark, executedCnt) - // testTPCHAll(spark) + testTPCHAll(spark) // benchmarkTPCH(spark, executedCnt) System.out.println("waiting for finishing") @@ -232,7 +233,6 @@ object DSV2BenchmarkTest { | AND l_shipdate < date'1994-01-01' + interval 1 year | AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01 | AND l_quantity < 24; - | |""".stripMargin) // .show(30, false) df.explain(false) val plan = df.queryExecution.executedPlan diff --git a/backends-velox/src/main/scala/io/glutenproject/vectorized/CloseableColumnBatchIterator.scala b/backends-velox/src/main/scala/io/glutenproject/vectorized/CloseableColumnBatchIterator.scala index 77b8772610ab..ae1a88d7d53c 100644 --- a/backends-velox/src/main/scala/io/glutenproject/vectorized/CloseableColumnBatchIterator.scala +++ b/backends-velox/src/main/scala/io/glutenproject/vectorized/CloseableColumnBatchIterator.scala @@ -34,11 +34,15 @@ class CloseableColumnBatchIterator(itr: Iterator[ColumnarBatch], extends Iterator[ColumnarBatch] with Logging { var cb: ColumnarBatch = _ + var scanTime = 0L override def hasNext: Boolean = { val beforeTime = System.nanoTime() val res = itr.hasNext - pipelineTime.foreach(t => t += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeTime)) + scanTime += System.nanoTime() - beforeTime + if (!res) { + pipelineTime.foreach(t => t += TimeUnit.NANOSECONDS.toMillis(scanTime)) + } res } @@ -50,7 +54,7 @@ class CloseableColumnBatchIterator(itr: Iterator[ColumnarBatch], val beforeTime = System.nanoTime() closeCurrentBatch() cb = itr.next() - pipelineTime.foreach(t => t += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeTime)) + scanTime += System.nanoTime() - beforeTime cb }