Skip to content

Commit

Permalink
Merge branch 'main' into agg
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn authored Jan 23, 2024
2 parents 11a85dd + deacda6 commit d031d69
Show file tree
Hide file tree
Showing 56 changed files with 1,274 additions and 1,067 deletions.
53 changes: 28 additions & 25 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ env:
HTTP_PROXY_PORT: 911
PATH_TO_GLUTEN_TE: ./tools/gluten-te
DOCKER_PULL_REGISTRY: 10.1.0.25:5000
LEGACY_DOCKER_REGISTRY: 10.0.2.4:5000
MAVEN_OPTS: -Dmaven.wagon.http.retryHandler.count=3

concurrency:
Expand All @@ -65,7 +64,7 @@ jobs:
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON --build_test_utils=ON'
./build_velox.sh --run_setup_script=OFF --velox_home=/opt/velox --enable_ep_cache=ON --build_test_utils=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand Down Expand Up @@ -112,7 +111,7 @@ jobs:
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON'
./build_velox.sh --run_setup_script=OFF --velox_home=/opt/velox --enable_ep_cache=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand Down Expand Up @@ -151,7 +150,7 @@ jobs:
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON'
./build_velox.sh --run_setup_script=OFF --velox_home=/opt/velox --enable_ep_cache=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand All @@ -169,6 +168,16 @@ jobs:
--local --preset=velox --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.3 Q38 flush
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh 'cd /opt/gluten/tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 --queries=q38 \
--disable-bhj \
--extra-conf=spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.1 \
--extra-conf=spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.2 \
--extra-conf=spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100 \
--extra-conf=spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0'
- name: Exit docker container
if: ${{ always() }}
run: |
Expand All @@ -189,7 +198,7 @@ jobs:
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON'
./build_velox.sh --run_setup_script=OFF --velox_home=/opt/velox --enable_ep_cache=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand Down Expand Up @@ -220,7 +229,7 @@ jobs:
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON'
./build_velox.sh --run_setup_script=OFF --velox_home=/opt/velox --enable_ep_cache=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand Down Expand Up @@ -258,7 +267,7 @@ jobs:
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON'
./build_velox.sh --run_setup_script=OFF --velox_home=/opt/velox --enable_ep_cache=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand Down Expand Up @@ -289,7 +298,7 @@ jobs:
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON'
./build_velox.sh --run_setup_script=OFF --velox_home=/opt/velox --enable_ep_cache=ON --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand Down Expand Up @@ -341,7 +350,7 @@ jobs:
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON'
./build_velox.sh --run_setup_script=OFF --velox_home=/opt/velox --enable_ep_cache=ON --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand Down Expand Up @@ -396,7 +405,7 @@ jobs:
sudo yum -y install patch && \
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON'
./build_velox.sh --run_setup_script=OFF --velox_home=/opt/velox --enable_ep_cache=ON --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand Down Expand Up @@ -444,8 +453,9 @@ jobs:
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
yum -y install epel-release centos-release-scl patch sudo && \
cd /opt/gluten/ep/build-velox/src && \
source /opt/rh/devtoolset-9/enable && \
./get_velox.sh --velox_home=/opt/velox --enable_hdfs=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON --enable_hdfs=ON'
./build_velox.sh --run_setup_script=ON --velox_home=/opt/velox --enable_ep_cache=ON --enable_s3=ON --enable_gcs=ON --enable_abfs=ON --enable_hdfs=ON'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
Expand Down Expand Up @@ -524,7 +534,7 @@ jobs:
- name: Setup docker container
run: |
docker run --rm --init --privileged --ulimit nofile=65536:65536 --ulimit core=-1 --security-opt seccomp=unconfined \
-v $PWD:/opt/gluten --name static-build-test-$GITHUB_RUN_ID -e NUM_THREADS=30 -detach $LEGACY_DOCKER_REGISTRY/gluten-dev/centos:7 \
-v $PWD:/opt/gluten --name static-build-test-$GITHUB_RUN_ID -e NUM_THREADS=30 -detach $DOCKER_PULL_REGISTRY/gluten-te/gluten-buildenv-centos:7 \
bash -c 'cd /opt/gluten && sleep 14400'
- name: Build Gluten CPP library
run: |
Expand All @@ -542,29 +552,22 @@ jobs:
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -Pdelta -DskipTests && \
cd /opt/gluten/tools/gluten-it && \
mvn clean install -Pspark-3.2'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 (centos 8)
run: |
docker run --rm --init --privileged --ulimit nofile=65536:65536 --ulimit core=-1 --security-opt seccomp=unconfined \
-v $PWD:/opt/gluten --name static-build-test-$GITHUB_RUN_ID-tpc -e NUM_THREADS=30 $LEGACY_DOCKER_REGISTRY/gluten-dev/centos:8 \
bash -c 'cd /opt/gluten/tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --disable-aqe --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx10G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=20g -s=1.0 --threads=32 --iterations=1'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 (ubuntu 20.04)
run: |
docker run --rm --init --privileged --ulimit nofile=65536:65536 --ulimit core=-1 --security-opt seccomp=unconfined \
-v $PWD:/opt/gluten --name static-build-test-$GITHUB_RUN_ID-tpc -e NUM_THREADS=30 $LEGACY_DOCKER_REGISTRY/gluten-dev/ubuntu:20.04 \
'cd /opt/gluten/tools/gluten-it \
-v $PWD:/opt/gluten --name static-build-test-$GITHUB_RUN_ID-tpc -e NUM_THREADS=30 ubuntu:20.04 \
bash -c 'apt-get update -y && DEBIAN_FRONTEND=noninteractive apt-get install openjdk-8-jre -y \
&& cd /opt/gluten/tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --disable-aqe --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx10G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=20g -s=1.0 --threads=32 --iterations=1'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 (ubuntu 22.04)
run: |
docker run --rm --init --privileged --ulimit nofile=65536:65536 --ulimit core=-1 --security-opt seccomp=unconfined \
-v $PWD:/opt/gluten --name static-build-test-$GITHUB_RUN_ID-tpc -e NUM_THREADS=30 $LEGACY_DOCKER_REGISTRY/gluten-dev/ubuntu:22.04 \
'cd /opt/gluten/tools/gluten-it \
-v $PWD:/opt/gluten --name static-build-test-$GITHUB_RUN_ID-tpc -e NUM_THREADS=30 ubuntu:22.04 \
bash -c 'apt-get update -y && DEBIAN_FRONTEND=noninteractive apt-get install openjdk-8-jre -y \
&& cd /opt/gluten/tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --disable-aqe --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx10G sbin/gluten-it.sh queries-compare \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,7 @@ class RangePartitionerBoundsGenerator[K: Ordering: ClassTag, V](
node.put("column_ref", index)
node.put("data_type", ordering.dataType.toString)
node.put("is_nullable", ordering.nullable)
node.put(
"direction",
SortExecTransformer.transformSortDirection(
ordering.direction.sql,
ordering.nullOrdering.sql))
node.put("direction", SortExecTransformer.transformSortDirection(ordering))
arrayNode.add(node)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite

assert(plans(4).metrics("numFiles").value === 1)
assert(plans(4).metrics("pruningTime").value === -1)
assert(plans(4).metrics("filesSize").value === 17777735)
assert(plans(4).metrics("filesSize").value === 19230111)
assert(plans(4).metrics("outputRows").value === 600572)

assert(plans(3).metrics("inputRows").value === 591673)
Expand Down Expand Up @@ -99,7 +99,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite

assert(plans(2).metrics("numFiles").value === 1)
assert(plans(2).metrics("pruningTime").value === -1)
assert(plans(2).metrics("filesSize").value === 17777735)
assert(plans(2).metrics("filesSize").value === 19230111)

assert(plans(1).metrics("inputRows").value === 591673)
assert(plans(1).metrics("outputRows").value === 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite

assert(plans(2).metrics("numFiles").value === 1)
assert(plans(2).metrics("pruningTime").value === -1)
assert(plans(2).metrics("filesSize").value === 17777735)
assert(plans(2).metrics("filesSize").value === 19230111)

assert(plans(1).metrics("outputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)
Expand Down Expand Up @@ -108,7 +108,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite

assert(plans(2).metrics("numFiles").value === 1)
assert(plans(2).metrics("pruningTime").value === -1)
assert(plans(2).metrics("filesSize").value === 17777735)
assert(plans(2).metrics("filesSize").value === 19230111)

assert(plans(1).metrics("outputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ object CHParquetReadBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark
}
val planNode =
PlanBuilder.makePlan(substraitContext, Lists.newArrayList(transformContext.root), outNames)
val fileFormat = ConverterUtils.getFileFormat(chFileScan)

val nativeFileScanRDD = BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD(
spark.sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.{AggregateFunctionRewriteRule, FlushableHas
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, CreateNamedStruct, ElementAt, Expression, ExpressionInfo, GetArrayItem, GetMapValue, GetStructField, Literal, NamedExpression, StringSplit, StringTrim}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, CreateNamedStruct, ElementAt, Expression, ExpressionInfo, GetArrayItem, GetMapValue, GetStructField, If, IsNaN, Literal, NamedExpression, NaNvl, StringSplit, StringTrim}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, HLLAdapter}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.JoinType
Expand Down Expand Up @@ -112,6 +112,23 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
new IfThenNode(Lists.newArrayList(lessThanFuncNode), Lists.newArrayList(nullNode), resultNode)
}

/** Transform NaNvl to Substrait. */
override def genNaNvlTransformer(
substraitExprName: String,
left: ExpressionTransformer,
right: ExpressionTransformer,
original: NaNvl): ExpressionTransformer = {
val condExpr = IsNaN(original.left)
val condFuncName = ExpressionMappings.expressionsMap(classOf[IsNaN])
val newExpr = If(condExpr, original.right, original.left)
IfTransformer(
GenericExpressionTransformer(condFuncName, Seq(left), condExpr),
right,
left,
newExpr
)
}

/**
* * Plans.
*/
Expand Down Expand Up @@ -518,7 +535,8 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
override def extraExpressionMappings: Seq[Sig] = {
Seq(
Sig[HLLAdapter](ExpressionNames.APPROX_DISTINCT),
Sig[UDFExpression](ExpressionNames.UDF_PLACEHOLDER))
Sig[UDFExpression](ExpressionNames.UDF_PLACEHOLDER),
Sig[NaNvl](ExpressionNames.NANVL))
}

override def genInjectedFunctions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,32 @@ object BackendSettings extends BackendSettingsApi {
validateTypes(typeValidator)
case DwrfReadFormat => ValidationResult.ok
case OrcReadFormat =>
val typeValidator: PartialFunction[StructField, String] = {
case StructField(_, ByteType, _, _) => "ByteType not support"
case StructField(_, arrayType: ArrayType, _, _)
if arrayType.elementType.isInstanceOf[StructType] =>
"StructType as element in ArrayType"
case StructField(_, arrayType: ArrayType, _, _)
if arrayType.elementType.isInstanceOf[ArrayType] =>
"ArrayType as element in ArrayType"
case StructField(_, mapType: MapType, _, _) if mapType.keyType.isInstanceOf[StructType] =>
"StructType as Key in MapType"
case StructField(_, mapType: MapType, _, _)
if mapType.valueType.isInstanceOf[ArrayType] =>
"ArrayType as Value in MapType"
case StructField(_, stringType: StringType, _, metadata)
if CharVarcharUtils
.getRawTypeString(metadata)
.getOrElse(stringType.catalogString) != stringType.catalogString =>
CharVarcharUtils.getRawTypeString(metadata) + " not support"
case StructField(_, TimestampType, _, _) => "TimestampType not support"
if (!GlutenConfig.getConf.veloxOrcScanEnabled) {
ValidationResult.notOk(s"Velox ORC scan is turned off.")
} else {
val typeValidator: PartialFunction[StructField, String] = {
case StructField(_, ByteType, _, _) => "ByteType not support"
case StructField(_, arrayType: ArrayType, _, _)
if arrayType.elementType.isInstanceOf[StructType] =>
"StructType as element in ArrayType"
case StructField(_, arrayType: ArrayType, _, _)
if arrayType.elementType.isInstanceOf[ArrayType] =>
"ArrayType as element in ArrayType"
case StructField(_, mapType: MapType, _, _)
if mapType.keyType.isInstanceOf[StructType] =>
"StructType as Key in MapType"
case StructField(_, mapType: MapType, _, _)
if mapType.valueType.isInstanceOf[ArrayType] =>
"ArrayType as Value in MapType"
case StructField(_, stringType: StringType, _, metadata)
if CharVarcharUtils
.getRawTypeString(metadata)
.getOrElse(stringType.catalogString) != stringType.catalogString =>
CharVarcharUtils.getRawTypeString(metadata) + " not support"
case StructField(_, TimestampType, _, _) => "TimestampType not support"
}
validateTypes(typeValidator)
}
validateTypes(typeValidator)
case _ => ValidationResult.notOk(s"Unsupported file format for $format.")
}
}
Expand Down
Loading

0 comments on commit d031d69

Please sign in to comment.