From 5466d57329317c02c43a017805b6a1e794e97754 Mon Sep 17 00:00:00 2001 From: wushengyeyouya <690574002@qq.com> Date: Wed, 25 Aug 2021 20:15:34 +0800 Subject: [PATCH 1/2] Optimize computation client module, add some test classes for new client sdk. --- .../computation/client/LinkisJobBuilder.scala | 3 +- .../computation/client/LinkisJobClient.scala | 25 ++++++++ .../interactive/InteractiveJobBuilder.scala | 20 +++++++ .../computation/client/once/OnceJob.scala | 7 ++- .../computation/client/FlinkOnceJobTest.java | 59 +++++++++++++++++++ .../client/InteractiveJobTest.java | 28 +++++++++ .../ujes/client/UJESClientImplTest.scala | 2 + .../ujes/client/UJESClientImplTestJ.java | 5 +- 8 files changed, 146 insertions(+), 3 deletions(-) create mode 100644 linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobClient.scala create mode 100644 linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/FlinkOnceJobTest.java create mode 100644 linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/InteractiveJobTest.java diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobBuilder.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobBuilder.scala index 43145b26b6..e1212e15eb 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobBuilder.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobBuilder.scala @@ -185,12 +185,13 @@ object LinkisJobBuilder { def setDefaultAuthToken(authTokenValue: String): Unit = this.authTokenValue = authTokenValue + private[client] def justGetDefaultUJESClient: UJESClient = ujesClient + def getDefaultUJESClient: UJESClient = { if(ujesClient == null) synchronized { if(clientConfig == null) buildDefaultConfig() if(ujesClient == null) { ujesClient = new UJESClientImpl(clientConfig) - Utils.addShutdownHook(() => ujesClient.close()) } } ujesClient diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobClient.scala new file mode 100644 index 0000000000..d688354e61 --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobClient.scala @@ -0,0 +1,25 @@ +package com.webank.wedatasphere.linkis.computation.client + +import java.io.Closeable + +import com.webank.wedatasphere.linkis.computation.client.interactive.InteractiveJob +import com.webank.wedatasphere.linkis.computation.client.once.OnceJob + + +/** + * This class is only used to provide a unified entry for user to build a LinkisJob conveniently and simply. + * Please keep this class lightweight enough, do not set too many field to confuse user. + */ +object LinkisJobClient extends Closeable { + + val config = LinkisJobBuilder + + val interactive = InteractiveJob + val once = OnceJob + + override def close(): Unit = { + if(config.justGetDefaultUJESClient != null) { + config.justGetDefaultUJESClient.close() + } + } +} diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/interactive/InteractiveJobBuilder.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/interactive/InteractiveJobBuilder.scala index f452d2867b..35d1127ea8 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/interactive/InteractiveJobBuilder.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/interactive/InteractiveJobBuilder.scala @@ -18,20 +18,40 @@ package com.webank.wedatasphere.linkis.computation.client.interactive import com.webank.wedatasphere.linkis.computation.client.AbstractLinkisJobBuilder +import com.webank.wedatasphere.linkis.computation.client.utils.LabelKeyUtils import com.webank.wedatasphere.linkis.manager.label.entity.engine.RunType.RunType import com.webank.wedatasphere.linkis.ujes.client.UJESClient import com.webank.wedatasphere.linkis.ujes.client.request.JobSubmitAction +import org.apache.commons.lang.StringUtils class InteractiveJobBuilder private[interactive]() extends AbstractLinkisJobBuilder[SubmittableInteractiveJob] { + private var creator: String = _ + + override def addExecuteUser(executeUser: String): this.type = super.addExecuteUser(executeUser) + + def setEngineType(engineType: String): this.type = addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, engineType) + + def setCreator(creator: String): this.type = { + this.creator = creator + this + } + def setCode(code: String): this.type = addJobContent("code", code) def setRunType(runType: RunType): this.type= addJobContent("runType", runType.toString) def setRunTypeStr(runType: String): this.type = addJobContent("runType", runType) + override protected def validate(): Unit = { + if(labels != null && !labels.containsKey(LabelKeyUtils.USER_CREATOR_LABEL_KEY) + && StringUtils.isNotBlank(creator)) + addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, executeUser + "-" + creator) + super.validate() + } + override protected def createLinkisJob(ujesClient: UJESClient, jobSubmitAction: JobSubmitAction): SubmittableInteractiveJob = new SubmittableInteractiveJob(ujesClient, jobSubmitAction) diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/once/OnceJob.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/once/OnceJob.scala index ab5baab245..6421ef2a28 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/once/OnceJob.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/once/OnceJob.scala @@ -22,6 +22,7 @@ import java.util import com.webank.wedatasphere.linkis.common.ServiceInstance import com.webank.wedatasphere.linkis.computation.client.once.action.{GetEngineConnAction, KillEngineConnAction} +import com.webank.wedatasphere.linkis.computation.client.once.simple.SimpleOnceJob trait OnceJob extends AbstractLinkisJob { @@ -57,4 +58,8 @@ trait OnceJob extends AbstractLinkisJob { } -trait SubmittableOnceJob extends OnceJob with SubmittableLinkisJob \ No newline at end of file +trait SubmittableOnceJob extends OnceJob with SubmittableLinkisJob + +object OnceJob { + val simple = SimpleOnceJob +} \ No newline at end of file diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/FlinkOnceJobTest.java b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/FlinkOnceJobTest.java new file mode 100644 index 0000000000..3f9ce88b83 --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/FlinkOnceJobTest.java @@ -0,0 +1,59 @@ +package com.webank.wedatasphere.linkis.computation.client; + +import com.webank.wedatasphere.linkis.common.conf.Configuration; +import com.webank.wedatasphere.linkis.computation.client.once.simple.SubmittableSimpleOnceJob; +import com.webank.wedatasphere.linkis.computation.client.utils.LabelKeyUtils; + +/** + * @author enjoyyin + * @date 2021-08-25 + * @since 0.5.0 + */ +public class FlinkOnceJobTest { + public static void main(String[] args) { + // TODO First, set the right gateway url. + LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9002"); + // TODO Second, modify the sql, so Flink engineConn can run it successfully. + String sql = "CREATE TABLE mysql_binlog (\n" + + " id INT NOT NULL,\n" + + " name STRING,\n" + + " age INT\n" + + ") WITH (\n" + + " 'connector' = 'mysql-cdc',\n" + + " 'hostname' = 'ip',\n" + + " 'port' = 'port',\n" + + " 'username' = '${username}',\n" + + " 'password' = '${password}',\n" + + " 'database-name' = '${database}',\n" + + " 'table-name' = '${tablename}',\n" + + " 'debezium.snapshot.locking.mode' = 'none'\n" + + ");\n" + + "CREATE TABLE sink_table (\n" + + " id INT NOT NULL,\n" + + " name STRING,\n" + + " age INT,\n" + + " primary key(id) not enforced\n" + + ") WITH (\n" + + " 'connector' = 'jdbc',\n" + + " 'url' = 'jdbc:mysql://${ip}:port/${database}',\n" + + " 'table-name' = '${tablename}',\n" + + " 'driver' = 'com.mysql.jdbc.Driver',\n" + + " 'username' = '${username}',\n" + + " 'password' = '${password}'\n" + + ");\n" + + "INSERT INTO sink_table SELECT id, name, age FROM mysql_binlog"; + // TODO Thirdly, please modify the user_creator label and executeUser + SubmittableSimpleOnceJob onceJob = LinkisJobClient.once().simple().builder().setCreateService("Flink-Test") + .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY(), "flink-1.12.2") + .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY(), "hadoop-Streamis") + .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY(), "once") + .addStartupParam(Configuration.IS_TEST_MODE().key(), true) + .setMaxSubmitTime(300000) + .addExecuteUser("hadoop").addJobContent("runType", "sql").addJobContent("code", sql).addSource("jobName", "OnceJobTest") + .build(); + onceJob.submit(); + System.out.println(onceJob.getId()); + onceJob.waitForCompleted(); + System.exit(0); + } +} diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/InteractiveJobTest.java b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/InteractiveJobTest.java new file mode 100644 index 0000000000..6bef875edc --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/InteractiveJobTest.java @@ -0,0 +1,28 @@ +package com.webank.wedatasphere.linkis.computation.client; + +import com.webank.wedatasphere.linkis.computation.client.interactive.SubmittableInteractiveJob; + +/** + * A test class for submit a sql to hive engineConn. + */ +public class InteractiveJobTest { + + public static void main(String[] args) { + // TODO First, set the right gateway url. + LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9002"); + //TODO Secondly, please modify the executeUser + SubmittableInteractiveJob job = LinkisJobClient.interactive().builder() + .setEngineType("hive").setRunTypeStr("sql").setCreator("IDE") + .setCode("show tables").addExecuteUser("hadoop").build(); + // 3. Submit Job to Linkis + job.submit(); + // 4. Wait for Job completed + job.waitForCompleted(); + // 5. Get results from iterators. + ResultSetIterator iterator = job.getResultSetIterables()[0].iterator(); + System.out.println(iterator.getMetadata()); + while(iterator.hasNext()){ + System.out.println(iterator.next()); + } + } +} diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/ujes/client/UJESClientImplTest.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/ujes/client/UJESClientImplTest.scala index 74c271385f..e2c709a611 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/ujes/client/UJESClientImplTest.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/ujes/client/UJESClientImplTest.scala @@ -25,8 +25,10 @@ import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction.Engin import com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, ResultSetAction} import org.apache.commons.io.IOUtils +@Deprecated object UJESClientImplTest extends App { + // Suggest to use LinkisJobClient to submit job to Linkis. val clientConfig = DWSClientConfigBuilder.newBuilder().addServerUrl("http://localhost:port") .connectionTimeout(30000).discoveryEnabled(true) .discoveryFrequency(1, TimeUnit.MINUTES) diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/ujes/client/UJESClientImplTestJ.java b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/ujes/client/UJESClientImplTestJ.java index bd302e7b9b..4492b8df3f 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/ujes/client/UJESClientImplTestJ.java +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/ujes/client/UJESClientImplTestJ.java @@ -30,9 +30,10 @@ import java.util.concurrent.TimeUnit; - +@Deprecated public class UJESClientImplTestJ{ public static void main(String[] args){ + // Suggest to use LinkisJobClient to submit job to Linkis. DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder().addServerUrl("http://localhost:port") .connectionTimeout(30000).discoveryEnabled(true) .discoveryFrequency(1, TimeUnit.MINUTES) @@ -49,6 +50,7 @@ public static void main(String[] args){ JobStatusResult status = client.status(jobExecuteResult); while(!status.isCompleted()) { JobProgressResult progress = client.progress(jobExecuteResult); + System.out.println("progress: " + progress.getProgress()); Utils.sleepQuietly(500); status = client.status(jobExecuteResult); } @@ -58,4 +60,5 @@ public static void main(String[] args){ System.out.println("fileContents: " + fileContents); IOUtils.closeQuietly(client); } + } \ No newline at end of file From fc0b2725670448eba1afed8a7da66842a7c3df4d Mon Sep 17 00:00:00 2001 From: wushengyeyouya <690574002@qq.com> Date: Wed, 25 Aug 2021 20:17:44 +0800 Subject: [PATCH 2/2] Optimize computation client module, add some test classes for new client sdk. --- .../computation/client/LinkisJobClient.scala | 17 +++++++++++++++++ .../computation/client/FlinkOnceJobTest.java | 17 +++++++++++++++++ .../computation/client/InteractiveJobTest.java | 17 +++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobClient.scala index d688354e61..ccd2996aaf 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobClient.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/com/webank/wedatasphere/linkis/computation/client/LinkisJobClient.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.webank.wedatasphere.linkis.computation.client import java.io.Closeable diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/FlinkOnceJobTest.java b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/FlinkOnceJobTest.java index 3f9ce88b83..28f9f7b0a4 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/FlinkOnceJobTest.java +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/FlinkOnceJobTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.webank.wedatasphere.linkis.computation.client; import com.webank.wedatasphere.linkis.common.conf.Configuration; diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/InteractiveJobTest.java b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/InteractiveJobTest.java index 6bef875edc..767c443b03 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/InteractiveJobTest.java +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/com/webank/wedatasphere/linkis/computation/client/InteractiveJobTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.webank.wedatasphere.linkis.computation.client; import com.webank.wedatasphere.linkis.computation.client.interactive.SubmittableInteractiveJob;