From 917f743e68261032cb83522b08aae48538663649 Mon Sep 17 00:00:00 2001 From: peacewong Date: Thu, 19 Aug 2021 10:30:14 +0800 Subject: [PATCH 1/2] Solve Spark version compatibility issues --- .../engineplugin/spark/executor/SQLSession.scala | 11 +++++------ .../spark/metadata/SparkSQLHistoryParser.scala | 4 ++-- .../spark/sql/execution/datasources/csv/UDF.scala | 7 +++++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/executor/SQLSession.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/executor/SQLSession.scala index 2adfd72df7..617021421d 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/executor/SQLSession.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/executor/SQLSession.scala @@ -18,7 +18,7 @@ package com.webank.wedatasphere.linkis.engineplugin.spark.executor import java.text.NumberFormat -import com.webank.wedatasphere.linkis.common.utils.Utils +import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.EngineExecutionContext import com.webank.wedatasphere.linkis.engineplugin.spark.config.SparkConfiguration import com.webank.wedatasphere.linkis.engineplugin.spark.exception.SparkEngineException @@ -30,7 +30,6 @@ import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, Ta import com.webank.wedatasphere.linkis.storage.{LineMetaData, LineRecord} import org.apache.commons.lang.StringUtils import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.{StructField, StructType} @@ -45,7 +44,7 @@ object SQLSession extends Logging { def showDF(sc: SparkContext, jobGroup: String, dataFrame: DataFrame, alias: String, maxResult: Int, engineExecutionContext: EngineExecutionContext): Unit = { // if (sc.isStopped) { - log.error("Spark application has already stopped in showDF, please restart it.") + logger.error("Spark application has already stopped in showDF, please restart it.") throw new LinkisJobRetryException("Spark application sc has already stopped, please restart it.") } val startTime = System.currentTimeMillis() @@ -86,7 +85,7 @@ object SQLSession extends Logging { //val columnsSet = dataFrame.schema val columns = columnsSet.map(c => Column(c.name, DataType.toDataType(c.dataType.typeName.toLowerCase), c.getComment().orNull)).toArray[Column] - columns.foreach(c => log.info(s"c is ${c.columnName}, comment is ${c.comment}")) + columns.foreach(c => logger.info(s"c is ${c.columnName}, comment is ${c.comment}")) if (columns == null || columns.isEmpty) return val metaData = new TableMetaData(columns) val writer = if (StringUtils.isNotBlank(alias)) @@ -112,7 +111,7 @@ object SQLSession extends Logging { }) { t => throw new SparkEngineException(40001, s"read record exception", t) } - log.warn(s"Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).") + logger.warn(s"Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).") //Update by peaceWong to register TempTable //Utils.tryAndErrorMsg(CSTableRegister.registerTempTable(engineExecutorContext, writer, alias, columns))("Failed to register tmp table:") engineExecutionContext.appendStdout(s"${EngineUtils.getName} >> Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).") @@ -125,7 +124,7 @@ object SQLSession extends Logging { val metaData = new LineMetaData(null) writer.addMetaData(metaData) writer.addRecord(new LineRecord(htmlContent.toString)) - log.warn(s"Time taken: ${System.currentTimeMillis() - startTime}") + logger.warn(s"Time taken: ${System.currentTimeMillis() - startTime}") engineExecutionContext.sendResultSet(writer) } diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala index 8c19f95775..ab000f6a3c 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala @@ -192,7 +192,7 @@ object SparkSQLHistoryParser { plan match { case c: CreateDataSourceTableAsSelectCommand => - val columnList = if (null == c.table.schema || c.table.schema.isEmpty) toCSColumnsByColumnName(c.outputColumnNames) else toCSColumns(c.table.schema) + val columnList = toCSColumns(c.table.schema) addTableOrViewLevelObjs(c.table.identifier, outputObjects, columns = columnList, actionType = TableOperationType.CREATE) ParseQuery(c.query, inputObjects) @@ -200,7 +200,7 @@ object SparkSQLHistoryParser { addTableOrViewLevelObjs(c.table.identifier, outputObjects, columns = toCSColumns(c.table.schema), actionType = TableOperationType.CREATE) case c: CreateHiveTableAsSelectCommand => - val columnList = if (null == c.tableDesc.schema || c.tableDesc.schema.isEmpty) toCSColumnsByColumnName(c.outputColumnNames) else toCSColumns(c.tableDesc.schema) + val columnList = toCSColumns(c.tableDesc.schema) addTableOrViewLevelObjs(c.tableDesc.identifier, outputObjects, columns = columnList, actionType = TableOperationType.CREATE) ParseQuery(c.query, inputObjects) diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala index e6e5f27501..14d47e7049 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.FunctionIdentifier /** * @@ -28,6 +27,10 @@ object UDF extends Serializable{ spark.sessionState.functionRegistry.listFunction().foreach(println) def existsUDF(name: String)(implicit spark: SparkSession): Boolean = { - spark.sessionState.functionRegistry.functionExists(FunctionIdentifier(name)) + val list = spark.sessionState.functionRegistry.listFunction() + if (null != list) { + return list.map(_.toString).exists(name.equals(_)) + } + false } } From ba6eb8847a16f514d9160e1fd2390235cbf80f6f Mon Sep 17 00:00:00 2001 From: peacewong Date: Thu, 19 Aug 2021 10:59:40 +0800 Subject: [PATCH 2/2] Solve Hive version compatibility issues --- .../serde/CustomerDelimitedJSONSerDe.java | 44 ++----------------- .../hive/creation/HiveEngineConnFactory.scala | 6 +-- 2 files changed, 6 insertions(+), 44 deletions(-) diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/java/com/webank/wedatasphere/linkis/engineplugin/hive/serde/CustomerDelimitedJSONSerDe.java b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/java/com/webank/wedatasphere/linkis/engineplugin/hive/serde/CustomerDelimitedJSONSerDe.java index c84c89f017..ac63f6a694 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/java/com/webank/wedatasphere/linkis/engineplugin/hive/serde/CustomerDelimitedJSONSerDe.java +++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/java/com/webank/wedatasphere/linkis/engineplugin/hive/serde/CustomerDelimitedJSONSerDe.java @@ -18,12 +18,10 @@ import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.io.*; -import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.objectinspector.*; @@ -57,31 +55,6 @@ public Object doDeserialize(Writable field) throws SerDeException { throw new SerDeException("DelimitedJSONSerDe cannot deserialize."); } - @Override - protected void serializeField(ByteStream.Output out, Object obj, ObjectInspector objInspector, - LazySerDeParameters serdeParams) throws SerDeException { - if (!objInspector.getCategory().equals(ObjectInspector.Category.PRIMITIVE) || (objInspector.getTypeName().equalsIgnoreCase(serdeConstants.BINARY_TYPE_NAME))) { - //do this for all complex types and binary - try { - serialize(out, SerDeUtils.getJSONString(obj, objInspector, serdeParams.getNullSequence().toString()), - PrimitiveObjectInspectorFactory.javaStringObjectInspector, serdeParams.getSeparators(), - 1, serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(), - serdeParams.getNeedsEscape()); - - } catch (IOException e) { - throw new SerDeException(e); - } - - } else { - //primitives except binary - try { - serialize(out, obj, objInspector, serdeParams.getSeparators(), 1, serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams.getNeedsEscape()); - } catch (IOException e) { - throw new SerDeException(e); - } - } - } - public static void serialize(ByteStream.Output out, Object obj, ObjectInspector objInspector, byte[] separators, int level, Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape) throws IOException, SerDeException { if (obj == null) { @@ -247,23 +220,14 @@ private static void writePrimitiveUTF8(OutputStream out, Object o, binaryData = Base64.encodeBase64(String.valueOf(tw).getBytes()); break; } - case INTERVAL_YEAR_MONTH: { - HiveIntervalYearMonthWritable hw = ((HiveIntervalYearMonthObjectInspector) oi).getPrimitiveWritableObject(o); - binaryData = Base64.encodeBase64(String.valueOf(hw).getBytes()); - break; - } - case INTERVAL_DAY_TIME: { - HiveIntervalDayTimeWritable ht = ((HiveIntervalDayTimeObjectInspector) oi).getPrimitiveWritableObject(o); - binaryData = Base64.encodeBase64(String.valueOf(ht).getBytes()); - break; - } case DECIMAL: { HiveDecimalObjectInspector decimalOI = (HiveDecimalObjectInspector) oi; binaryData = Base64.encodeBase64(String.valueOf(decimalOI).getBytes()); break; } default: { - throw new RuntimeException("Unknown primitive type: " + category); + Object hw = oi.getPrimitiveWritableObject(o); + binaryData = Base64.encodeBase64(String.valueOf(hw).getBytes()); } } if(binaryData == null){ diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala index c2ec03c03b..297003e0b8 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala @@ -64,11 +64,9 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w if (BDP_QUEUE_NAME.equals(k)) hiveConf.set(HIVE_QUEUE_NAME, v) else hiveConf.set(k, v) } hiveConf.setVar(HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH, HiveEngineConfiguration.HIVE_LIB_HOME.getValue + "/*") - if(HiveEngineConfiguration.ENABLE_FETCH_BASE64){ - hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE,HiveEngineConfiguration.BASE64_SERDE_CLASS) + if(HiveEngineConfiguration.ENABLE_FETCH_BASE64) { + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, HiveEngineConfiguration.BASE64_SERDE_CLASS) hiveConf.set("enable_fetch_base64","true") - }else{ - hiveConf.set("enable_fetch_base64","false") } /* //Update by peaceWong add hook to HiveDriver if (StringUtils.isNotBlank(EnvConfiguration.LINKIS_HIVE_POST_HOOKS)) {