Skip to content

Commit

Permalink
Merge branch 'main' into 2024_03_22_ts_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 authored Mar 26, 2024
2 parents 6ffaa39 + 0d03c32 commit abaa255
Show file tree
Hide file tree
Showing 29 changed files with 1,061 additions and 205 deletions.
87 changes: 87 additions & 0 deletions .github/workflows/build_bundle_package.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Build bundle package

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true

on:
workflow_dispatch:
inputs:
os:
description: 'OS version: ubuntu:20.04, ubuntu:22.04, centos:7 or centos:8'
required: true
default: 'ubuntu:20.04'
spark:
description: 'Spark version: spark-3.2, spark-3.3, spark-3.4 or spark-3.5'
required: true
default: 'spark-3.5'
hadoop:
description: 'Hadoop version: 2.7.4, 3.2.0, 3.2.2, 3.3.1, 3.3.3 or 3.3.6'
required: true
default: '3.3.3'

jobs:
build-native-lib:
runs-on: ubuntu-20.04
container: inteldpo/gluten-centos-packaging:latest
steps:
- uses: actions/checkout@v3
- name: Build Gluten velox third party
run: |
yum install sudo patch java-1.8.0-openjdk-devel -y && \
cd $GITHUB_WORKSPACE/ep/build-velox/src && \
./get_velox.sh && \
source /opt/rh/devtoolset-9/enable && \
source /opt/gluten//dev/vcpkg/env.sh && \
cd $GITHUB_WORKSPACE/ && \
sed -i '/^headers/d' ep/build-velox/build/velox_ep/CMakeLists.txt && \
export NUM_THREADS=4
./dev/builddeps-veloxbe.sh --build_tests=OFF --build_benchmarks=OFF --enable_s3=OFF \
--enable_gcs=OFF --enable_hdfs=ON --enable_abfs=OFF
- uses: actions/upload-artifact@v2
with:
path: ./cpp/build/releases/
name: velox-native-lib-${{github.sha}}
retention-days: 1

build-pabkages:
needs: build-native-lib
runs-on: ubuntu-20.04
container: ${{ github.event.inputs.os }}
steps:
- uses: actions/checkout@v3
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-${{github.sha}}
path: ./cpp/build/releases
- name: Setup java and maven
run: |
apt-get update && \
apt-get install -y openjdk-8-jdk maven && \
apt remove openjdk-11* -y
- name: Build for Spark ${{ github.event.inputs.spark }}
run: |
cd $GITHUB_WORKSPACE/ && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v4
with:
name: gluten-velox-bundle-package
path: package/target/gluten-velox-bundle-*.jar
retention-days: 7
File renamed without changes.
575 changes: 575 additions & 0 deletions .github/workflows/velox_docker.yml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
import org.apache.spark.shuffle.utils.CHShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.CHAggregateFunctionRewriteRule
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -518,6 +519,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
override def genExtendedOptimizers(): List[SparkSession => Rule[LogicalPlan]] = {
List(
spark => new CommonSubexpressionEliminateRule(spark, spark.sessionState.conf),
spark => CHAggregateFunctionRewriteRule(spark),
_ => CountDistinctWithoutExpand
)
}
Expand Down Expand Up @@ -764,4 +766,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
): GenerateExecTransformerBase = {
CHGenerateExecTransformer(generator, requiredChildOutput, outer, generatorOutput, child)
}

override def genPreProjectForGenerate(generate: GenerateExec): SparkPlan = generate

override def genPostProjectForGenerate(generate: GenerateExec): SparkPlan = generate
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst

import io.glutenproject.GlutenConfig

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types._

/**
* Avg(Int) function: CH use input type for intermediate sum type, while spark use double so need
* convert .
* @param spark
*/
case class CHAggregateFunctionRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case a: Aggregate =>
a.transformExpressions {
case avgExpr @ AggregateExpression(avg: Average, _, _, _, _)
if GlutenConfig.getConf.enableCastAvgAggregateFunction &&
GlutenConfig.getConf.enableColumnarHashAgg &&
!avgExpr.isDistinct && isDataTypeNeedConvert(avg.child.dataType) =>
AggregateExpression(
avg.copy(child = Cast(avg.child, DoubleType)),
avgExpr.mode,
avgExpr.isDistinct,
avgExpr.filter,
avgExpr.resultId
)
}
}

private def isDataTypeNeedConvert(dataType: DataType): Boolean = {
dataType match {
case FloatType => true
case IntegerType => true
case LongType => true
case ShortType => true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -639,4 +639,25 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS
val sql = "select cast(concat(' ', cast(id as string)) as bigint) from range(10)"
runQueryAndCompare(sql)(checkOperatorMatch[ProjectExecTransformer])
}

test("avg(bigint) overflow") {
withSQLConf(
"spark.gluten.sql.columnar.forceShuffledHashJoin" -> "false",
"spark.sql.autoBroadcastJoinThreshold" -> "-1") {
withTable("myitem") {
sql("create table big_int(id bigint) using parquet")
sql("""
|insert into big_int values (9223372036854775807),
|(9223372036854775807),
|(9223372036854775807),
|(9223372036854775807)
|""".stripMargin)
val q = "select avg(id) from big_int"
runQueryAndCompare(q)(checkOperatorMatch[CHHashAggregateExecTransformer])
val disinctSQL = "select count(distinct id), avg(distinct id), avg(id) from big_int"
runQueryAndCompare(disinctSQL)(checkOperatorMatch[CHHashAggregateExecTransformer])
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ public class UdfJniWrapper {

public UdfJniWrapper() {}

public native void nativeLoadUdfLibraries(String udfLibPaths);
public native void getFunctionSignatures();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{BroadcastUtils, ColumnarBuildSideRelation, ColumnarShuffleExchangeExec, SparkPlan, VeloxColumnarWriteFilesExec}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.BuildSideRelation
Expand Down Expand Up @@ -649,7 +649,7 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {

override def genInjectedFunctions()
: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = {
UDFResolver.loadAndGetFunctionDescriptions
UDFResolver.getFunctionSignatures
}

override def rewriteSpillPath(path: String): String = {
Expand Down Expand Up @@ -677,4 +677,12 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
): GenerateExecTransformerBase = {
GenerateExecTransformer(generator, requiredChildOutput, outer, generatorOutput, child)
}

override def genPreProjectForGenerate(generate: GenerateExec): SparkPlan = {
PullOutGenerateProjectHelper.pullOutPreProject(generate)
}

override def genPostProjectForGenerate(generate: GenerateExec): SparkPlan = {
PullOutGenerateProjectHelper.pullOutPostProject(generate)
}
}
Loading

0 comments on commit abaa255

Please sign in to comment.