diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 16fb1f683710f..4bd9ee03f96dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -584,15 +584,18 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { - if (row == null) { + if (row == null || currentVars != null) { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } val blocks = new ArrayBuffer[String]() val blockBuilder = new StringBuilder() for (code <- expressions) { - // We can't know how many byte code will be generated, so use the number of bytes as limit - if (blockBuilder.length > 64 * 1000) { + // We can't know how many bytecode will be generated, so use the length of source code + // as metric. A method should not go beyond 8K, otherwise it will not be JITted, should + // also not be too small, or it will have many function calls (for wide table), see the + // results in BenchmarkWideTable. + if (blockBuilder.length > 1024) { blocks.append(blockBuilder.toString()) blockBuilder.clear() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index cfc47aba889aa..bd7efa606e0ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -603,8 +603,6 @@ case class HashAggregateExec( // create grouping key ctx.currentVars = input - // make sure that the generated code will not be splitted as multiple functions - ctx.INPUT_ROW = null val unsafeRowKeyCode = GenerateUnsafeProjection.createCode( ctx, groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output))) val vectorizedRowKeys = ctx.generateExpressions( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala new file mode 100644 index 0000000000000..9dcaca0ca93ee --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.util.Benchmark + + +/** + * Benchmark to measure performance for wide table. + * To run this: + * build/sbt "sql/test-only *benchmark.BenchmarkWideTable" + * + * Benchmarks in this file are skipped in normal builds. + */ +class BenchmarkWideTable extends BenchmarkBase { + + ignore("project on wide table") { + val N = 1 << 20 + val df = sparkSession.range(N) + val columns = (0 until 400).map{ i => s"id as id$i"} + val benchmark = new Benchmark("projection on wide table", N) + benchmark.addCase("wide table", numIters = 5) { iter => + df.selectExpr(columns : _*).queryExecution.toRdd.count() + } + benchmark.run() + + /** + * Here are some numbers with different split threshold: + * + * Split threshold methods Rate(M/s) Per Row(ns) + * 10 400 0.4 2279 + * 100 200 0.6 1554 + * 1k 37 0.9 1116 + * 8k 5 0.5 2025 + * 64k 1 0.0 21649 + */ + } +}