Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solve the problem of compatibility between Spark and hive versions #966

Merged
merged 2 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@


import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.io.*;
import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.objectinspector.*;
Expand Down Expand Up @@ -57,31 +55,6 @@ public Object doDeserialize(Writable field) throws SerDeException {
throw new SerDeException("DelimitedJSONSerDe cannot deserialize.");
}

@Override
protected void serializeField(ByteStream.Output out, Object obj, ObjectInspector objInspector,
LazySerDeParameters serdeParams) throws SerDeException {
if (!objInspector.getCategory().equals(ObjectInspector.Category.PRIMITIVE) || (objInspector.getTypeName().equalsIgnoreCase(serdeConstants.BINARY_TYPE_NAME))) {
//do this for all complex types and binary
try {
serialize(out, SerDeUtils.getJSONString(obj, objInspector, serdeParams.getNullSequence().toString()),
PrimitiveObjectInspectorFactory.javaStringObjectInspector, serdeParams.getSeparators(),
1, serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(),
serdeParams.getNeedsEscape());

} catch (IOException e) {
throw new SerDeException(e);
}

} else {
//primitives except binary
try {
serialize(out, obj, objInspector, serdeParams.getSeparators(), 1, serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams.getNeedsEscape());
} catch (IOException e) {
throw new SerDeException(e);
}
}
}


public static void serialize(ByteStream.Output out, Object obj, ObjectInspector objInspector, byte[] separators, int level, Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape) throws IOException, SerDeException {
if (obj == null) {
Expand Down Expand Up @@ -247,23 +220,14 @@ private static void writePrimitiveUTF8(OutputStream out, Object o,
binaryData = Base64.encodeBase64(String.valueOf(tw).getBytes());
break;
}
case INTERVAL_YEAR_MONTH: {
HiveIntervalYearMonthWritable hw = ((HiveIntervalYearMonthObjectInspector) oi).getPrimitiveWritableObject(o);
binaryData = Base64.encodeBase64(String.valueOf(hw).getBytes());
break;
}
case INTERVAL_DAY_TIME: {
HiveIntervalDayTimeWritable ht = ((HiveIntervalDayTimeObjectInspector) oi).getPrimitiveWritableObject(o);
binaryData = Base64.encodeBase64(String.valueOf(ht).getBytes());
break;
}
case DECIMAL: {
HiveDecimalObjectInspector decimalOI = (HiveDecimalObjectInspector) oi;
binaryData = Base64.encodeBase64(String.valueOf(decimalOI).getBytes());
break;
}
default: {
throw new RuntimeException("Unknown primitive type: " + category);
Object hw = oi.getPrimitiveWritableObject(o);
binaryData = Base64.encodeBase64(String.valueOf(hw).getBytes());
}
}
if(binaryData == null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w
if (BDP_QUEUE_NAME.equals(k)) hiveConf.set(HIVE_QUEUE_NAME, v) else hiveConf.set(k, v)
}
hiveConf.setVar(HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH, HiveEngineConfiguration.HIVE_LIB_HOME.getValue + "/*")
if(HiveEngineConfiguration.ENABLE_FETCH_BASE64){
hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE,HiveEngineConfiguration.BASE64_SERDE_CLASS)
if(HiveEngineConfiguration.ENABLE_FETCH_BASE64) {
hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, HiveEngineConfiguration.BASE64_SERDE_CLASS)
hiveConf.set("enable_fetch_base64","true")
}else{
hiveConf.set("enable_fetch_base64","false")
}
/* //Update by peaceWong add hook to HiveDriver
if (StringUtils.isNotBlank(EnvConfiguration.LINKIS_HIVE_POST_HOOKS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.webank.wedatasphere.linkis.engineplugin.spark.executor

import java.text.NumberFormat

import com.webank.wedatasphere.linkis.common.utils.Utils
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import com.webank.wedatasphere.linkis.engineplugin.spark.config.SparkConfiguration
import com.webank.wedatasphere.linkis.engineplugin.spark.exception.SparkEngineException
Expand All @@ -30,7 +30,6 @@ import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, Ta
import com.webank.wedatasphere.linkis.storage.{LineMetaData, LineRecord}
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructField, StructType}

Expand All @@ -45,7 +44,7 @@ object SQLSession extends Logging {
def showDF(sc: SparkContext, jobGroup: String, dataFrame: DataFrame, alias: String, maxResult: Int, engineExecutionContext: EngineExecutionContext): Unit = {
//
if (sc.isStopped) {
log.error("Spark application has already stopped in showDF, please restart it.")
logger.error("Spark application has already stopped in showDF, please restart it.")
throw new LinkisJobRetryException("Spark application sc has already stopped, please restart it.")
}
val startTime = System.currentTimeMillis()
Expand Down Expand Up @@ -86,7 +85,7 @@ object SQLSession extends Logging {
//val columnsSet = dataFrame.schema
val columns = columnsSet.map(c =>
Column(c.name, DataType.toDataType(c.dataType.typeName.toLowerCase), c.getComment().orNull)).toArray[Column]
columns.foreach(c => log.info(s"c is ${c.columnName}, comment is ${c.comment}"))
columns.foreach(c => logger.info(s"c is ${c.columnName}, comment is ${c.comment}"))
if (columns == null || columns.isEmpty) return
val metaData = new TableMetaData(columns)
val writer = if (StringUtils.isNotBlank(alias))
Expand All @@ -112,7 +111,7 @@ object SQLSession extends Logging {
}) { t =>
throw new SparkEngineException(40001, s"read record exception", t)
}
log.warn(s"Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).")
logger.warn(s"Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).")
//Update by peaceWong to register TempTable
//Utils.tryAndErrorMsg(CSTableRegister.registerTempTable(engineExecutorContext, writer, alias, columns))("Failed to register tmp table:")
engineExecutionContext.appendStdout(s"${EngineUtils.getName} >> Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).")
Expand All @@ -125,7 +124,7 @@ object SQLSession extends Logging {
val metaData = new LineMetaData(null)
writer.addMetaData(metaData)
writer.addRecord(new LineRecord(htmlContent.toString))
log.warn(s"Time taken: ${System.currentTimeMillis() - startTime}")
logger.warn(s"Time taken: ${System.currentTimeMillis() - startTime}")

engineExecutionContext.sendResultSet(writer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ object SparkSQLHistoryParser {
plan match {

case c: CreateDataSourceTableAsSelectCommand =>
val columnList = if (null == c.table.schema || c.table.schema.isEmpty) toCSColumnsByColumnName(c.outputColumnNames) else toCSColumns(c.table.schema)
val columnList = toCSColumns(c.table.schema)
addTableOrViewLevelObjs(c.table.identifier, outputObjects, columns = columnList, actionType = TableOperationType.CREATE)
ParseQuery(c.query, inputObjects)

case c: CreateDataSourceTableCommand =>
addTableOrViewLevelObjs(c.table.identifier, outputObjects, columns = toCSColumns(c.table.schema), actionType = TableOperationType.CREATE)

case c: CreateHiveTableAsSelectCommand =>
val columnList = if (null == c.tableDesc.schema || c.tableDesc.schema.isEmpty) toCSColumnsByColumnName(c.outputColumnNames) else toCSColumns(c.tableDesc.schema)
val columnList = toCSColumns(c.tableDesc.schema)
addTableOrViewLevelObjs(c.tableDesc.identifier, outputObjects, columns = columnList, actionType = TableOperationType.CREATE)
ParseQuery(c.query, inputObjects)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.csv

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier

/**
*
Expand All @@ -28,6 +27,10 @@ object UDF extends Serializable{
spark.sessionState.functionRegistry.listFunction().foreach(println)

def existsUDF(name: String)(implicit spark: SparkSession): Boolean = {
spark.sessionState.functionRegistry.functionExists(FunctionIdentifier(name))
val list = spark.sessionState.functionRegistry.listFunction()
if (null != list) {
return list.map(_.toString).exists(name.equals(_))
}
false
}
}