Skip to content

Commit

Permalink
Merge pull request #966 from peacewong/dev-1.0.2
Browse files Browse the repository at this point in the history
Solve the problem of compatibility between Spark and hive versions
  • Loading branch information
Alexkun authored Aug 19, 2021
2 parents 0850689 + ba6eb88 commit d87a821
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@


import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.io.*;
import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.objectinspector.*;
Expand Down Expand Up @@ -57,31 +55,6 @@ public Object doDeserialize(Writable field) throws SerDeException {
throw new SerDeException("DelimitedJSONSerDe cannot deserialize.");
}

@Override
protected void serializeField(ByteStream.Output out, Object obj, ObjectInspector objInspector,
LazySerDeParameters serdeParams) throws SerDeException {
if (!objInspector.getCategory().equals(ObjectInspector.Category.PRIMITIVE) || (objInspector.getTypeName().equalsIgnoreCase(serdeConstants.BINARY_TYPE_NAME))) {
//do this for all complex types and binary
try {
serialize(out, SerDeUtils.getJSONString(obj, objInspector, serdeParams.getNullSequence().toString()),
PrimitiveObjectInspectorFactory.javaStringObjectInspector, serdeParams.getSeparators(),
1, serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(),
serdeParams.getNeedsEscape());

} catch (IOException e) {
throw new SerDeException(e);
}

} else {
//primitives except binary
try {
serialize(out, obj, objInspector, serdeParams.getSeparators(), 1, serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams.getNeedsEscape());
} catch (IOException e) {
throw new SerDeException(e);
}
}
}


public static void serialize(ByteStream.Output out, Object obj, ObjectInspector objInspector, byte[] separators, int level, Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape) throws IOException, SerDeException {
if (obj == null) {
Expand Down Expand Up @@ -247,23 +220,14 @@ private static void writePrimitiveUTF8(OutputStream out, Object o,
binaryData = Base64.encodeBase64(String.valueOf(tw).getBytes());
break;
}
case INTERVAL_YEAR_MONTH: {
HiveIntervalYearMonthWritable hw = ((HiveIntervalYearMonthObjectInspector) oi).getPrimitiveWritableObject(o);
binaryData = Base64.encodeBase64(String.valueOf(hw).getBytes());
break;
}
case INTERVAL_DAY_TIME: {
HiveIntervalDayTimeWritable ht = ((HiveIntervalDayTimeObjectInspector) oi).getPrimitiveWritableObject(o);
binaryData = Base64.encodeBase64(String.valueOf(ht).getBytes());
break;
}
case DECIMAL: {
HiveDecimalObjectInspector decimalOI = (HiveDecimalObjectInspector) oi;
binaryData = Base64.encodeBase64(String.valueOf(decimalOI).getBytes());
break;
}
default: {
throw new RuntimeException("Unknown primitive type: " + category);
Object hw = oi.getPrimitiveWritableObject(o);
binaryData = Base64.encodeBase64(String.valueOf(hw).getBytes());
}
}
if(binaryData == null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w
if (BDP_QUEUE_NAME.equals(k)) hiveConf.set(HIVE_QUEUE_NAME, v) else hiveConf.set(k, v)
}
hiveConf.setVar(HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH, HiveEngineConfiguration.HIVE_LIB_HOME.getValue + "/*")
if(HiveEngineConfiguration.ENABLE_FETCH_BASE64){
hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE,HiveEngineConfiguration.BASE64_SERDE_CLASS)
if(HiveEngineConfiguration.ENABLE_FETCH_BASE64) {
hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, HiveEngineConfiguration.BASE64_SERDE_CLASS)
hiveConf.set("enable_fetch_base64","true")
}else{
hiveConf.set("enable_fetch_base64","false")
}
/* //Update by peaceWong add hook to HiveDriver
if (StringUtils.isNotBlank(EnvConfiguration.LINKIS_HIVE_POST_HOOKS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.webank.wedatasphere.linkis.engineplugin.spark.executor

import java.text.NumberFormat

import com.webank.wedatasphere.linkis.common.utils.Utils
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import com.webank.wedatasphere.linkis.engineplugin.spark.config.SparkConfiguration
import com.webank.wedatasphere.linkis.engineplugin.spark.exception.SparkEngineException
Expand All @@ -30,7 +30,6 @@ import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, Ta
import com.webank.wedatasphere.linkis.storage.{LineMetaData, LineRecord}
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructField, StructType}

Expand All @@ -45,7 +44,7 @@ object SQLSession extends Logging {
def showDF(sc: SparkContext, jobGroup: String, dataFrame: DataFrame, alias: String, maxResult: Int, engineExecutionContext: EngineExecutionContext): Unit = {
//
if (sc.isStopped) {
log.error("Spark application has already stopped in showDF, please restart it.")
logger.error("Spark application has already stopped in showDF, please restart it.")
throw new LinkisJobRetryException("Spark application sc has already stopped, please restart it.")
}
val startTime = System.currentTimeMillis()
Expand Down Expand Up @@ -86,7 +85,7 @@ object SQLSession extends Logging {
//val columnsSet = dataFrame.schema
val columns = columnsSet.map(c =>
Column(c.name, DataType.toDataType(c.dataType.typeName.toLowerCase), c.getComment().orNull)).toArray[Column]
columns.foreach(c => log.info(s"c is ${c.columnName}, comment is ${c.comment}"))
columns.foreach(c => logger.info(s"c is ${c.columnName}, comment is ${c.comment}"))
if (columns == null || columns.isEmpty) return
val metaData = new TableMetaData(columns)
val writer = if (StringUtils.isNotBlank(alias))
Expand All @@ -112,7 +111,7 @@ object SQLSession extends Logging {
}) { t =>
throw new SparkEngineException(40001, s"read record exception", t)
}
log.warn(s"Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).")
logger.warn(s"Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).")
//Update by peaceWong to register TempTable
//Utils.tryAndErrorMsg(CSTableRegister.registerTempTable(engineExecutorContext, writer, alias, columns))("Failed to register tmp table:")
engineExecutionContext.appendStdout(s"${EngineUtils.getName} >> Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).")
Expand All @@ -125,7 +124,7 @@ object SQLSession extends Logging {
val metaData = new LineMetaData(null)
writer.addMetaData(metaData)
writer.addRecord(new LineRecord(htmlContent.toString))
log.warn(s"Time taken: ${System.currentTimeMillis() - startTime}")
logger.warn(s"Time taken: ${System.currentTimeMillis() - startTime}")

engineExecutionContext.sendResultSet(writer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ object SparkSQLHistoryParser {
plan match {

case c: CreateDataSourceTableAsSelectCommand =>
val columnList = if (null == c.table.schema || c.table.schema.isEmpty) toCSColumnsByColumnName(c.outputColumnNames) else toCSColumns(c.table.schema)
val columnList = toCSColumns(c.table.schema)
addTableOrViewLevelObjs(c.table.identifier, outputObjects, columns = columnList, actionType = TableOperationType.CREATE)
ParseQuery(c.query, inputObjects)

case c: CreateDataSourceTableCommand =>
addTableOrViewLevelObjs(c.table.identifier, outputObjects, columns = toCSColumns(c.table.schema), actionType = TableOperationType.CREATE)

case c: CreateHiveTableAsSelectCommand =>
val columnList = if (null == c.tableDesc.schema || c.tableDesc.schema.isEmpty) toCSColumnsByColumnName(c.outputColumnNames) else toCSColumns(c.tableDesc.schema)
val columnList = toCSColumns(c.tableDesc.schema)
addTableOrViewLevelObjs(c.tableDesc.identifier, outputObjects, columns = columnList, actionType = TableOperationType.CREATE)
ParseQuery(c.query, inputObjects)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.csv

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier

/**
*
Expand All @@ -28,6 +27,10 @@ object UDF extends Serializable{
spark.sessionState.functionRegistry.listFunction().foreach(println)

def existsUDF(name: String)(implicit spark: SparkSession): Boolean = {
spark.sessionState.functionRegistry.functionExists(FunctionIdentifier(name))
val list = spark.sessionState.functionRegistry.listFunction()
if (null != list) {
return list.map(_.toString).exists(name.equals(_))
}
false
}
}

0 comments on commit d87a821

Please sign in to comment.