Skip to content

Commit

Permalink
[GJ-41] Support WS Tansformer with row-based output (oap-project#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzcclp authored Jan 20, 2022
1 parent 86a8e86 commit 6d7fe2c
Show file tree
Hide file tree
Showing 11 changed files with 450 additions and 199 deletions.
98 changes: 98 additions & 0 deletions jvm/src/main/java/com/intel/oap/row/JniInstance.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.row;

import io.kyligence.jni.engine.LocalEngine;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Java API for in-process profiling. Serves as a wrapper around
* async-profiler native library. This class is a singleton.
* The first call to {@link #getInstance()} initiates loading of
* libasyncProfiler.so.
*/
public class JniInstance {

private static JniInstance instance;

private String currLibPath = "";

private JniInstance() {
}

public static JniInstance getInstance() {
return getInstance(null);
}

public static synchronized JniInstance getInstance(String libPath) {
if (instance != null) {
return instance;
}

File file = null;
boolean libPathExists = false;
if (StringUtils.isNotBlank(libPath)) {
file = new File(libPath);
libPathExists = file.isFile() && file.exists();
}
if (!libPathExists) {
String soFileName = "/liblocal_engine_jnid.so";
try {
InputStream is = JniInstance.class.getResourceAsStream(soFileName);
file = File.createTempFile("lib", ".so");
OutputStream os = new FileOutputStream(file);
byte[] buffer = new byte[128 << 10];
int length;
while ((length = is.read(buffer)) != -1) {
os.write(buffer, 0, length);
}
is.close();
os.close();
} catch (IOException e) {
}
}
if (file != null) {
try {
file.setReadable(true, false);
System.load(file.getAbsolutePath());
libPath = file.getAbsolutePath();
} catch (UnsatisfiedLinkError error) {
throw error;
}
}
instance = new JniInstance();
instance.setCurrLibPath(libPath);
LocalEngine.initEngineEnv();
return instance;
}

public void setCurrLibPath(String currLibPath) {
this.currLibPath = currLibPath;
}

public LocalEngine buildLocalEngine(byte[] substraitPlan) {
LocalEngine localEngine = new LocalEngine(substraitPlan);
return localEngine;
}
}
58 changes: 58 additions & 0 deletions jvm/src/main/java/com/intel/oap/row/RowIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.row;

import io.kyligence.jni.engine.LocalEngine;

import java.io.IOException;

public class RowIterator {

private LocalEngine localEngine;
private boolean closed = false;

public RowIterator() throws IOException {}

public RowIterator(byte[] plan, String soFilePath) throws IOException {
this.localEngine = JniInstance.getInstance(soFilePath).buildLocalEngine(plan);
this.localEngine.execute();
}

public boolean hasNext() throws IOException {
return this.localEngine.hasNext();
}

public SparkRowInfo next() throws IOException {
if (this.localEngine == null) {
return null;
}
return this.localEngine.next();
}

public void close() {
if (!closed) {
if (this.localEngine != null) {
try {
this.localEngine.close();
} catch (IOException e) {
}
}
closed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
* limitations under the License.
*/

package com.intel.oap.vectorized;
package com.intel.oap.row;

/** POJO to hold the output file path of the designated partition id */
public class PartitionFileInfo {
private final int partitionId;
private final String filePath;
public class SparkRowInfo {
public long[] offsets;
public long[] lengths;
public long memoryAddress;
public long fieldsNum;

public PartitionFileInfo(int partitionId, String filePath) {
this.partitionId = partitionId;
this.filePath = filePath;
}

public int getPartitionId() {
return partitionId;
}

public String getFilePath() {
return filePath;
}
public SparkRowInfo(long[] offsets, long[] lengths, long memoryAddress, long fieldsNum) {
this.offsets = offsets;
this.lengths = lengths;
this.memoryAddress = memoryAddress;
this.fieldsNum = fieldsNum;
}
}
46 changes: 46 additions & 0 deletions jvm/src/main/java/io/kyligence/jni/engine/LocalEngine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.kyligence.jni.engine;

import java.io.Closeable;
import java.io.IOException;

import com.intel.oap.row.SparkRowInfo;

public class LocalEngine implements Closeable {
public static native long test(int a, int b);

public static native void initEngineEnv();

private long nativeExecutor;
private byte[] plan;

public LocalEngine(byte[] plan) {
this.plan = plan;
}

public native void execute();

public native boolean hasNext();

public native SparkRowInfo next();


@Override
public native void close() throws IOException;
}
9 changes: 8 additions & 1 deletion jvm/src/main/scala/com/intel/oap/GazellePluginConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.intel.oap

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -128,6 +127,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val enablePreferColumnar: Boolean =
conf.getConfString("spark.oap.sql.columnar.preferColumnar", "true").toBoolean

// This config is used for specifying whether to use columnar basic iterator.
val enableColumnarIterator: Boolean =
conf.getConfString("spark.oap.sql.columnar.iterator", "true").toBoolean

// This config is used for testing. Setting to false will disable loading native libraries.
val loadNative: Boolean =
conf.getConfString("spark.oap.sql.columnar.loadnative", "true").toBoolean
Expand All @@ -136,6 +139,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val nativeLibName: String =
conf.getConfString("spark.oap.sql.columnar.libname", "spark_columnar_jni")

// This config is used for specifying the absolute path of the native library.
val nativeLibPath: String =
conf.getConfString("spark.oap.sql.columnar.libpath", "")

// fallback to row operators if there are several continous joins
val joinOptimizationThrottle: Integer =
conf.getConfString("spark.oap.sql.columnar.joinOptimizationLevel", "12").toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import com.intel.oap.expression._
import com.intel.oap.substrait.expression.ExpressionNode
import com.intel.oap.substrait.rel.{RelBuilder, RelNode}
import com.intel.oap.substrait.SubstraitContext

import com.intel.oap.GazellePluginConfig
import org.apache.spark.SparkConf

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -43,7 +44,7 @@ case class ConditionProjectExecTransformer(

val sparkConf: SparkConf = sparkContext.getConf

override def supportsColumnar: Boolean = true
override def supportsColumnar: Boolean = GazellePluginConfig.getConf.enableColumnarIterator

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package com.intel.oap.execution

import com.intel.oap.GazellePluginConfig
import com.intel.oap.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import com.intel.oap.substrait.rel.{LocalFilesBuilder, RelBuilder}
import com.intel.oap.substrait.rel.RelBuilder
import com.intel.oap.substrait.SubstraitContext
import com.intel.oap.substrait.`type`.TypeBuiler
import com.intel.oap.substrait.expression.{ExpressionBuilder, ExpressionNode}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -34,14 +32,16 @@ import org.apache.spark.sql.vectorized.ColumnarBatch

class BatchScanExecTransformer(output: Seq[AttributeReference], @transient scan: Scan)
extends BatchScanExec(output, scan) with TransformSupport {
val tmpDir: String = GazellePluginConfig.getConf.tmpFile

val filterExprs: Seq[Expression] = if (scan.isInstanceOf[FileScan]) {
scan.asInstanceOf[FileScan].dataFilters
} else {
throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported")
}

override def supportsColumnar(): Boolean = true
override def supportsColumnar(): Boolean =
super.supportsColumnar && GazellePluginConfig.getConf.enableColumnarIterator

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.intel.oap.expression._
import com.intel.oap.substrait.expression.{AggregateFunctionNode, ExpressionBuilder, ExpressionNode}
import com.intel.oap.substrait.rel.{RelBuilder, RelNode}
import com.intel.oap.substrait.SubstraitContext
import com.intel.oap.GazellePluginConfig
import java.util

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -52,7 +53,7 @@ case class HashAggregateExecTransformer(

val sparkConf = sparkContext.getConf

override def supportsColumnar: Boolean = true
override def supportsColumnar: Boolean = GazellePluginConfig.getConf.enableColumnarIterator

val resAttributes: Seq[Attribute] = resultExpressions.map(_.toAttribute)

Expand Down Expand Up @@ -89,7 +90,7 @@ case class HashAggregateExecTransformer(

override def doValidate(): Boolean = {
var isPartial = true
aggregateExpressions.toList.foreach(aggExpr => {
aggregateExpressions.foreach(aggExpr => {
aggExpr.mode match {
case Partial =>
case _ => isPartial = false
Expand Down Expand Up @@ -175,7 +176,7 @@ case class HashAggregateExecTransformer(

// Get the aggregate function nodes
val aggregateFunctionList = new util.ArrayList[AggregateFunctionNode]()
groupingExpressions.toList.foreach(expr => {
groupingExpressions.foreach(expr => {
val groupingExpr: Expression = ExpressionConverter
.replaceWithExpressionTransformer(expr, originalInputAttributes)
val exprNode = groupingExpr.asInstanceOf[ExpressionTransformer].doTransform(args)
Expand All @@ -185,7 +186,7 @@ case class HashAggregateExecTransformer(
Lists.newArrayList(exprNode), outputTypeNode)
aggregateFunctionList.add(aggFunctionNode)
})
aggregateExpressions.toList.foreach(aggExpr => {
aggregateExpressions.foreach(aggExpr => {
val aggregatFunc = aggExpr.aggregateFunction
val functionId = AggregateFunctionsBuilder.create(args, aggregatFunc)
val mode = modeToKeyWord(aggExpr.mode)
Expand Down
Loading

0 comments on commit 6d7fe2c

Please sign in to comment.