Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supply the function in engineconn plugin module #752

Merged
merged 4 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ wds.linkis.engineconn.debug.enable=true

#wds.linkis.keytab.enable=true

wds.linkis.engineconn.plugin.default.clazz=com.webank.wedatasphere.linkis.engineplugin.hive.HiveEngineConnPlugin
wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.engineplugin.hive.HiveEngineConnPlugin

wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.JarUdfEngineHook
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,12 @@
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/linkis.log"
filePattern="logs/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
<SizeBasedTriggeringPolicy size="100MB"/>
<DefaultRolloverStrategy max="20"/>
</RollingFile>
<Send name="Send" >
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
</Send>
</appenders>
<loggers>
<root level="INFO">
<!--<appender-ref ref="RollingFile"/>-->
<appender-ref ref="Console"/>
<appender-ref ref="Send"/>
</root>
Expand All @@ -54,6 +47,18 @@
<logger name="org.apache.hadoop.hdfs.KeyProviderCache" level="fatal" additivity="true">
<appender-ref ref="Send"/>
</logger>

<logger name="org.spark_project.jetty" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
<logger name="org.eclipse.jetty" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
<logger name="org.springframework" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
<logger name="org.reflections.Reflections" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>

</loggers>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package com.webank.wedatasphere.linkis.engineplugin.hive.conf

import com.webank.wedatasphere.linkis.common.conf.CommonVars

object HiveEngineConfiguration {

val HIVE_LIB_HOME = CommonVars[String]("hive.lib", CommonVars[String]("HIVE_LIB", "/appcom/Install/hive/lib").getValue)

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,39 @@ import java.security.PrivilegedExceptionAction

import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.common.creation.EngineCreationContext
import com.webank.wedatasphere.linkis.engineconn.common.engineconn.{DefaultEngineConn, EngineConn}
import com.webank.wedatasphere.linkis.engineconn.core.executor.ExecutorManager
import com.webank.wedatasphere.linkis.engineconn.executor.entity.Executor
import com.webank.wedatasphere.linkis.engineconn.common.engineconn.EngineConn
import com.webank.wedatasphere.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
import com.webank.wedatasphere.linkis.engineconn.executor.entity.LabelExecutor
import com.webank.wedatasphere.linkis.engineplugin.hive.common.HiveUtils
import com.webank.wedatasphere.linkis.engineplugin.hive.conf.HiveEngineConfiguration
import com.webank.wedatasphere.linkis.engineplugin.hive.entity.HiveSession
import com.webank.wedatasphere.linkis.engineplugin.hive.exception.HiveSessionStartFailedException
import com.webank.wedatasphere.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
import com.webank.wedatasphere.linkis.hadoop.common.utils.HDFSUtils
import com.webank.wedatasphere.linkis.manager.engineplugin.common.creation.SingleExecutorEngineConnFactory
import com.webank.wedatasphere.linkis.manager.label.entity.engine.{EngineRunTypeLabel, EngineType, RunType}
import com.webank.wedatasphere.linkis.manager.label.entity.engine.EngineType.EngineType
import com.webank.wedatasphere.linkis.manager.label.entity.engine.RunType.RunType
import com.webank.wedatasphere.linkis.manager.label.entity.engine.{EngineType, RunType}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.session.SessionState

import scala.collection.JavaConversions._

class HiveEngineConnFactory extends SingleExecutorEngineConnFactory with Logging {
class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory with Logging {

private val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename"
private val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue"
private var engineCreationContext: EngineCreationContext = _

override def createExecutor(engineCreationContext: EngineCreationContext, engineConn: EngineConn): Executor = {
engineConn.getEngine() match {
override protected def newExecutor(id: Int, engineCreationContext: EngineCreationContext, engineConn: EngineConn): LabelExecutor = {
engineConn.getEngineConnSession match {
case hiveSession: HiveSession =>
this.engineCreationContext = engineCreationContext
val id = ExecutorManager.getInstance().generateId()
val executor = new HiveEngineConnExecutor(id, hiveSession.sessionState, hiveSession.ugi, hiveSession.hiveConf, hiveSession.baos)
executor.getExecutorLabels().add(getDefaultEngineRunTypeLabel())
executor
new HiveEngineConnExecutor(id, hiveSession.sessionState, hiveSession.ugi, hiveSession.hiveConf, hiveSession.baos)
case _ =>
throw HiveSessionStartFailedException(40012, "Failed to create hive executor")
}
}


override def createEngineConn(engineCreationContext: EngineCreationContext): EngineConn = {
override protected def createEngineConnSession(engineCreationContext: EngineCreationContext): HiveSession = {
val options = engineCreationContext.getOptions
val hiveConf: HiveConf = HiveUtils.getHiveConf
hiveConf.setVar(HiveConf.ConfVars.HIVEJAR, HiveUtils.jarOfClass(classOf[Driver])
Expand All @@ -67,7 +63,17 @@ class HiveEngineConnFactory extends SingleExecutorEngineConnFactory with Logging
info(s"key is $k, value is $v")
if (BDP_QUEUE_NAME.equals(k)) hiveConf.set(HIVE_QUEUE_NAME, v) else hiveConf.set(k, v)
}

hiveConf.setVar(HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH, HiveEngineConfiguration.HIVE_LIB_HOME.getValue + "/*")
/* //Update by peaceWong add hook to HiveDriver
if (StringUtils.isNotBlank(EnvConfiguration.LINKIS_HIVE_POST_HOOKS)) {
val hooks = if (StringUtils.isNotBlank(hiveConf.get("hive.exec.post.hooks"))) {
hiveConf.get("hive.exec.post.hooks") + "," + EnvConfiguration.LINKIS_HIVE_POST_HOOKS
} else {
EnvConfiguration.LINKIS_HIVE_POST_HOOKS
}
hiveConf.set("hive.exec.post.hooks", hooks)
}*/
//Update by peaceWong enable hive.stats.collect.scancols
hiveConf.setBoolean("hive.stats.collect.scancols", true)
val ugi = HDFSUtils.getUserGroupInformation(Utils.getJvmUser)
val sessionState: SessionState = ugi.doAs(new PrivilegedExceptionAction[SessionState] {
Expand All @@ -79,18 +85,11 @@ class HiveEngineConnFactory extends SingleExecutorEngineConnFactory with Logging
sessionState.err = new PrintStream(System.out, true, "utf-8")
SessionState.start(sessionState)

val hiveSession = HiveSession(sessionState, ugi, hiveConf, baos)
val engineConn = new DefaultEngineConn(engineCreationContext)
engineConn.setEngineType(EngineType.HIVE.toString)
engineConn.setEngine(hiveSession)
engineConn
HiveSession(sessionState, ugi, hiveConf, baos)
}

def getEngineCreationContext: EngineCreationContext = engineCreationContext
override protected def getEngineConnType: EngineType = EngineType.HIVE

override protected def getRunType: RunType = RunType.HIVE

override def getDefaultEngineRunTypeLabel(): EngineRunTypeLabel = {
val runTypeLabel = new EngineRunTypeLabel
runTypeLabel.setRunType(RunType.HIVE.toString)
runTypeLabel
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import java.util.concurrent.atomic.AtomicBoolean
import com.webank.wedatasphere.linkis.common.exception.ErrorException
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.parser.SQLCodeParser
import com.webank.wedatasphere.linkis.engineconn.core.EngineConnObject
import com.webank.wedatasphere.linkis.engineplugin.hive.cs.CSHiveHelper
import com.webank.wedatasphere.linkis.engineplugin.hive.exception.HiveQueryFailedException
import com.webank.wedatasphere.linkis.engineplugin.hive.progress.HiveProgressHelper
import com.webank.wedatasphere.linkis.governance.common.paser.SQLCodeParser
import com.webank.wedatasphere.linkis.manager.common.entity.resource.{CommonNodeResource, LoadInstanceResource, NodeResource}
import com.webank.wedatasphere.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
import com.webank.wedatasphere.linkis.manager.label.entity.Label
Expand Down Expand Up @@ -260,7 +260,7 @@ class HiveEngineConnExecutor(id: Int,
singleSqlProgressMap foreach {
case (jobId, progress) => arrayBuffer += JobProgressInfo(jobId, 200, 0, 0, 200)
}
engineExecutorContext.sendProgress(1.0f, arrayBuffer.toArray[JobProgressInfo])
engineExecutorContext.pushProgress(1.0f, arrayBuffer.toArray[JobProgressInfo])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.common.creation.EngineCreationContext
import com.webank.wedatasphere.linkis.engineconn.common.engineconn.EngineConn
import com.webank.wedatasphere.linkis.engineconn.common.hook.EngineConnHook
import com.webank.wedatasphere.linkis.engineconn.computation.executor.creation.ComputationExecutorManager
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import com.webank.wedatasphere.linkis.engineconn.core.executor.ExecutorManager
import com.webank.wedatasphere.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
import com.webank.wedatasphere.linkis.manager.label.entity.Label
import com.webank.wedatasphere.linkis.manager.label.entity.engine.{CodeLanguageLabel, RunType}
import org.apache.commons.lang.StringUtils

import scala.collection.JavaConversions._
Expand All @@ -45,13 +47,17 @@ class HiveAddJarsEngineHook extends EngineConnHook with Logging {
jars = value
}
}
val codeLanguageLabel = new CodeLanguageLabel
codeLanguageLabel.setCodeType(RunType.HIVE.toString)
val labels = Array[Label[_]](codeLanguageLabel)

if (StringUtils.isNotBlank(jars)) {
jars.split(",") foreach {
jar =>
try {
val sql = addSql + jar
logger.info("begin to run hive sql {}", sql)
ExecutorManager.getInstance().getDefaultExecutor match {
ComputationExecutorManager.getInstance.getExecutorByLabels(labels) match {
case executor: HiveEngineConnExecutor =>
executor.executeLine(new EngineExecutionContext(executor), sql)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.common.creation.EngineCreationContext
import com.webank.wedatasphere.linkis.engineconn.common.engineconn.EngineConn
import com.webank.wedatasphere.linkis.engineconn.common.hook.EngineConnHook
import com.webank.wedatasphere.linkis.engineconn.computation.executor.creation.ComputationExecutorManager
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import com.webank.wedatasphere.linkis.engineconn.core.executor.ExecutorManager
import com.webank.wedatasphere.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
import com.webank.wedatasphere.linkis.manager.label.entity.Label
import com.webank.wedatasphere.linkis.manager.label.entity.engine.{CodeLanguageLabel, RunType}
import org.apache.commons.lang.StringUtils

class UseDatabaseEngineHook extends EngineConnHook with Logging {
Expand All @@ -45,7 +47,10 @@ class UseDatabaseEngineHook extends EngineConnHook with Logging {
}
val useDataBaseSql = "use " + database
info(s"hive client begin to run init_code $useDataBaseSql")
ExecutorManager.getInstance().getDefaultExecutor match {
val codeLanguageLabel = new CodeLanguageLabel
codeLanguageLabel.setCodeType(RunType.HIVE.toString)
val labels = Array[Label[_]](codeLanguageLabel)
ComputationExecutorManager.getInstance.getExecutorByLabels(labels) match {
case executor: HiveEngineConnExecutor =>
executor.executeLine(new EngineExecutionContext(executor), useDataBaseSql)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@

package com.webank.wedatasphere.linkis.engineplugin.hive.launch

import java.util

import com.google.common.collect.Lists
import com.webank.wedatasphere.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest
import com.webank.wedatasphere.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder

class HiveProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {

override protected def ifAddHiveConfigPath: Boolean = true

override protected def getEngineConnManagerHooks(implicit engineConnBuildRequest: EngineConnBuildRequest): util.List[String] = {
Lists.newArrayList("JarUDFLoadECMHook")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

<artifactId>linkis-engineplugin-io_file</artifactId>
<properties>
<io_file.version>1</io_file.version>
<io_file.version>1.0</io_file.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<id>linkis-engineplugin-io_file</id>
<formats>
<format>dir</format>
<format>zip</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
<baseDirectory>io_file</baseDirectory>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.webank.wedatasphere.linkis.manager.engineplugin.io.utils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;


public class ReflectionUtils {
public static Object invoke(Object object, Method method, Object[] args) throws Throwable {
try {
return method.invoke(object, args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ wds.linkis.engineconn.debug.enable=true

#wds.linkis.keytab.enable=true

wds.linkis.engineconn.plugin.default.clazz=com.webank.wedatasphere.linkis.manager.engineplugin.io.IoEngineConnPlugin
wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.manager.engineplugin.io.IoEngineConnPlugin

wds.linkis.engineconn.io.version=1

#wds.linkis.engine.io.opts=" -Dfile.encoding=UTF-8 -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=49100 "

wds.linkis.engineconn.support.parallelism=true

wds.linkis.engineconn.max.free.time=0
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,14 @@
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
</Console>
<RollingFile name="RollingFile" fileName="log/linkis.log"
filePattern="log/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
<SizeBasedTriggeringPolicy size="100MB"/>
<DefaultRolloverStrategy max="20"/>
</RollingFile>
<Send name="Send" >
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
</Send>
</appenders>
<loggers>
<root level="INFO">
<!-- <appender-ref ref="RollingFile"/>
<appender-ref ref="Console"/>-->
<appender-ref ref="Console"/>
</root>



</loggers>
</configuration>

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.json4s.DefaultFormats
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class IoEngineConnExecutor(val id: Int, val outputLimit: Int) extends ConcurrentComputationExecutor(outputLimit) with Logging {
class IoEngineConnExecutor(val id: Int, val outputLimit: Int = 10) extends ConcurrentComputationExecutor(outputLimit) with Logging {

implicit val formats = DefaultFormats

Expand All @@ -62,12 +62,9 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int) extends Concurrent
private val namePrefix: String = "IoEngineConnExecutor_"

override def init(): Unit = {
if (!initialized) {
super.init
info("Ready to start IoEngine!")
cleanupThread.start()
initialized = true
}
super.init
info("Ready to start IoEngine!")
cleanupThread.start()
}

/*
Expand All @@ -88,7 +85,7 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int) extends Concurrent
clearCount = clearCount + 1
}
}
info(s"Finished to clear userFs, clear count: $clearCount")
debug(s"Finished to clear userFs, clear count: $clearCount")
Utils.tryQuietly(Thread.sleep(IOEngineConnConfiguration.IO_FS_CLEAR_TIME.getValue))
}
}
Expand Down
Loading