From b71e094fed373116949d471d0541d5a3182d199d Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Sat, 29 Jul 2023 15:10:01 +0800 Subject: [PATCH] The YarnClientTrait add clusterClient close (#2906) * The YarnClientTrait add clusterClient close * Optimized code --- .../flink/client/trait/YarnClientTrait.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala index fa88df7dd9..0ce7cb9c78 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala @@ -42,8 +42,8 @@ trait YarnClientTrait extends FlinkClientTrait { request: R, flinkConf: Configuration, actionFunc: (JobID, ClusterClient[_]) => O): O = { - val jobID = getJobID(request.jobId) - val clusterClient = { + + Utils.using { flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId) val clusterClientFactory = new YarnClusterClientFactory val applicationId = clusterClientFactory.getClusterId(flinkConf) @@ -53,15 +53,15 @@ trait YarnClientTrait extends FlinkClientTrait { } val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConf) clusterDescriptor.retrieve(applicationId).getClusterClient + } { + client => + Try(actionFunc(getJobID(request.jobId), client)).recover { + case e => + throw new FlinkException( + s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " + + s"detail: ${Utils.stringifyException(e)}"); + }.get } - Try { - actionFunc(jobID, clusterClient) - }.recover { - case e => - throw new FlinkException( - s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " + - s"detail: ${Utils.stringifyException(e)}"); - }.get } override def doTriggerSavepoint(