From c89ceed3d9987132486a714264fc4da391655c5d Mon Sep 17 00:00:00 2001 From: zhengke zhou <36791902+zzzk1@users.noreply.github.com> Date: Mon, 22 Jan 2024 13:48:13 +0800 Subject: [PATCH] [ISSUE-3472][Improve] Improve streampark-console module Controller invoke service (#3488) [ISSUE-3472][Improve] Improve streampark-console module Controller invoke service --- .../core/controller/AlertController.java | 2 +- .../controller/ApplicationController.java | 14 ++++----- .../core/controller/ConfigController.java | 2 +- .../controller/FlinkClusterController.java | 2 +- .../core/controller/FlinkSqlController.java | 2 +- .../core/controller/ResourceController.java | 2 +- .../core/mapper/ApplicationMapper.java | 2 +- .../service/ApplicationConfigService.java | 4 +-- .../core/service/FlinkClusterService.java | 2 +- .../console/core/service/FlinkSqlService.java | 2 +- .../console/core/service/ResourceService.java | 4 +-- .../service/alert/AlertConfigService.java | 2 +- .../alert/impl/AlertConfigServiceImpl.java | 4 +-- .../application/ApplicationActionService.java | 8 ++--- .../application/ApplicationInfoService.java | 12 +++---- .../application/ApplicationManageService.java | 8 ++--- .../impl/ApplicationActionServiceImpl.java | 31 +++++++++---------- .../impl/ApplicationInfoServiceImpl.java | 12 +++---- .../impl/ApplicationManageServiceImpl.java | 14 ++++----- .../impl/ApplicationConfigServiceImpl.java | 6 ++-- .../service/impl/FlinkClusterServiceImpl.java | 3 +- .../service/impl/FlinkSqlServiceImpl.java | 6 ++-- .../service/impl/ResourceServiceImpl.java | 6 ++-- .../system/controller/MemberController.java | 2 +- .../console/system/service/MemberService.java | 2 +- .../service/impl/MemberServiceImpl.java | 8 ++--- .../mapper/core/ApplicationMapper.xml | 4 +-- .../ApplicationManageServiceITest.java | 2 +- 28 files changed, 82 insertions(+), 86 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java index 381206f13f..234e5259f5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java @@ -92,7 +92,7 @@ public RestResponse getAlertConfig(@RequestBody AlertConfigParams params) { @PostMapping(value = "/list") public RestResponse alertConfigsPaginationList( @RequestBody AlertConfigParams params, RestRequest request) { - IPage page = alertConfigService.page(params, request); + IPage page = alertConfigService.page(params.getUserId(), request); return RestResponse.success(page); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java index 412eabef43..77736db007 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java @@ -86,7 +86,7 @@ public class ApplicationController { @PostMapping("get") @RequiresPermissions("app:detail") public RestResponse get(Application app) { - Application application = applicationManageService.getApp(app); + Application application = applicationManageService.getApp(app.getId()); return RestResponse.success(application); } @@ -167,7 +167,7 @@ public RestResponse mapping(Application app) { @PostMapping("revoke") @RequiresPermissions("app:release") public RestResponse revoke(Application app) { - applicationActionService.revoke(app); + applicationActionService.revoke(app.getId()); return RestResponse.success(); } @@ -175,7 +175,7 @@ public RestResponse revoke(Application app) { @PostMapping(value = "check_start") @RequiresPermissions("app:start") public RestResponse checkStart(Application app) { - AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app); + AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app.getId()); return RestResponse.success(stateEnum.get()); } @@ -286,7 +286,7 @@ public RestResponse clean(Application app) { @PostMapping("forcedStop") @RequiresPermissions("app:cancel") public RestResponse forcedStop(Application app) { - applicationActionService.forcedStop(app); + applicationActionService.forcedStop(app.getId()); return RestResponse.success(); } @@ -299,7 +299,7 @@ public RestResponse yarn() { @Operation(summary = "Get application on yarn name") @PostMapping("name") public RestResponse yarnName(Application app) { - String yarnName = applicationInfoService.getYarnName(app); + String yarnName = applicationInfoService.getYarnName(app.getConfig()); return RestResponse.success(yarnName); } @@ -313,7 +313,7 @@ public RestResponse checkName(Application app) { @Operation(summary = "Get application conf") @PostMapping("readConf") public RestResponse readConf(Application app) throws IOException { - String config = applicationInfoService.readConf(app); + String config = applicationInfoService.readConf(app.getConfig()); return RestResponse.success(config); } @@ -352,7 +352,7 @@ public RestResponse deleteOperationLog(Long id) { @PostMapping("delete") @RequiresPermissions("app:delete") public RestResponse delete(Application app) throws InternalException { - Boolean deleted = applicationManageService.remove(app); + Boolean deleted = applicationManageService.remove(app.getId()); return RestResponse.success(deleted); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java index 376cf9ea65..06e1c167c3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java @@ -73,7 +73,7 @@ public RestResponse list(ApplicationConfig config, RestRequest request) { @Operation(summary = "List application config histories") @PostMapping("history") public RestResponse history(Application application) { - List history = applicationConfigService.list(application); + List history = applicationConfigService.list(application.getId()); return RestResponse.success(history); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java index 2af28b1fb0..27b0b80aaa 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java @@ -118,7 +118,7 @@ public RestResponse shutdown(FlinkCluster cluster) { @Operation(summary = "Delete flink cluster") @PostMapping("delete") public RestResponse delete(FlinkCluster cluster) { - flinkClusterService.remove(cluster); + flinkClusterService.remove(cluster.getId()); return RestResponse.success(); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java index c407325aee..e3dba36890 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java @@ -118,7 +118,7 @@ public RestResponse get(String id) throws InternalException { @Operation(summary = "List the applications sql histories") @PostMapping("history") public RestResponse sqlhistory(Application application) { - List sqlList = flinkSqlService.listFlinkSqlHistory(application); + List sqlList = flinkSqlService.listFlinkSqlHistory(application.getId()); return RestResponse.success(sqlList); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java index 9e2b536759..c2761e0bcd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java @@ -84,7 +84,7 @@ public RestResponse updateResource(@Valid Resource resource) { @DeleteMapping("delete") @RequiresPermissions("resource:delete") public RestResponse deleteResource(@Valid Resource resource) { - this.resourceService.remove(resource); + this.resourceService.remove(resource.getId()); return RestResponse.success(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java index 095b240d1d..970ddc734f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java @@ -31,7 +31,7 @@ public interface ApplicationMapper extends BaseMapper { IPage selectPage(Page page, @Param("app") Application application); - Application selectApp(@Param("app") Application application); + Application selectApp(@Param("id") Long id); void persistMetrics(@Param("app") Application application); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java index 1c84852106..325d35e51b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java @@ -105,10 +105,10 @@ public interface ApplicationConfigService extends IService { /** * Retrieves the history of application configurations for a given application. * - * @param appParam The application for which to retrieve the history. + * @param appId The application's id for which to retrieve the history. * @return The list of application configurations representing the history. */ - List list(Application appParam); + List list(Long appId); /** * Reads a template from a file or a database. diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java index ff043e8256..cdf3b2a884 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java @@ -35,7 +35,7 @@ public interface FlinkClusterService extends IService { Boolean create(FlinkCluster flinkCluster); - void remove(FlinkCluster flinkCluster); + void remove(Long id); void update(FlinkCluster flinkCluster); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java index 050d94903e..39659da2ce 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java @@ -38,7 +38,7 @@ public interface FlinkSqlService extends IService { FlinkSql getLatestFlinkSql(Long appId, boolean decode); - List listFlinkSqlHistory(Application application); + List listFlinkSqlHistory(Long appId); FlinkSql getCandidate(Long appId, CandidateTypeEnum type); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java index 22f33c243b..5be7a2292b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java @@ -71,9 +71,9 @@ public interface ResourceService extends IService { /** * delete resource * - * @param resource + * @param id */ - void remove(Resource resource); + void remove(Long id); /** * Get resource through team id. diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertConfigService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertConfigService.java index 6da414fb7d..7c0dad5627 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertConfigService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertConfigService.java @@ -26,7 +26,7 @@ import com.baomidou.mybatisplus.extension.service.IService; public interface AlertConfigService extends IService { - IPage page(AlertConfigParams params, RestRequest request); + IPage page(Long userId, RestRequest request); boolean exist(AlertConfig alertConfig); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java index ed8d17f2de..6d889a7bad 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java @@ -50,10 +50,10 @@ public class AlertConfigServiceImpl extends ServiceImpl page(AlertConfigParams params, RestRequest request) { + public IPage page(Long userId, RestRequest request) { // build query conditions LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); - wrapper.eq(params.getUserId() != null, AlertConfig::getUserId, params.getUserId()); + wrapper.eq(userId != null, AlertConfig::getUserId, userId); Page page = MybatisPager.getPage(request); IPage resultPage = getBaseMapper().selectPage(page, wrapper); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java index e2936cb73d..2ab14f91e5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java @@ -48,10 +48,10 @@ public interface ApplicationActionService extends IService { /** * Revokes access for the given application. * - * @param appParam The application for which access needs to be revoked. + * @param appId The application's id for which access needs to be revoked. * @throws ApplicationException if an error occurs while revoking access. */ - void revoke(Application appParam) throws ApplicationException; + void revoke(Long appId) throws ApplicationException; /** * Cancels the given application. Throws an exception if cancellation fails. @@ -64,7 +64,7 @@ public interface ApplicationActionService extends IService { /** * Forces the given application to stop. * - * @param appParam the application to be stopped + * @param id the application's id which need to be stopped */ - void forcedStop(Application appParam); + void forcedStop(Long id); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java index 8d1e0b112d..c40cc6a2e2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java @@ -123,10 +123,10 @@ public interface ApplicationInfoService extends IService { /** * Gets the YARN name for the given application. * - * @param appParam The application for which to retrieve the YARN name. + * @param appConfig The application's config for which to retrieve the YARN name. * @return The YARN name of the application as a String. */ - String getYarnName(Application appParam); + String getYarnName(String appConfig); /** * Checks if the given application exists in the system. @@ -139,11 +139,11 @@ public interface ApplicationInfoService extends IService { /** * Reads the configuration for the given application and returns it as a String. * - * @param appParam The application for which the configuration needs to be read. + * @param appConfig The application's config for which the configuration needs to be read. * @return The configuration for the given application as a String. * @throws IOException If an I/O error occurs while reading the configuration. */ - String readConf(Application appParam) throws IOException; + String readConf(String appConfig) throws IOException; /** * Retrieves the main configuration value for the given Application. @@ -226,10 +226,10 @@ public interface ApplicationInfoService extends IService { /** * check application before start * - * @param appParam + * @param id the application's id which need to check before start. * @return org.apache.streampark.console.core.enums.AppExistsStateEnum */ - AppExistsStateEnum checkStart(Application appParam); + AppExistsStateEnum checkStart(Long id); /** * @param appName diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java index 4c111cc764..a5263fee51 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java @@ -102,19 +102,19 @@ public interface ApplicationManageService extends IService { /** * Deletes the given Application from the system. * - * @param appParam The Application to be deleted. + * @param appId The Application's id which need to be deleted. * @return True if the deletion was successful, false otherwise. */ - Boolean remove(Application appParam); + Boolean remove(Long appId); /** * Retrieves the Application with the specified details from the system. * - * @param appParam The Application object containing the details of the Application to retrieve. + * @param id The Application object's id. * @return The Application object that matches the specified details, or null if no matching * Application is found. */ - Application getApp(Application appParam); + Application getApp(Long id); /** * Updates the release of the given application. diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index 54fc3ec677..70f02315c9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -173,11 +173,10 @@ public class ApplicationActionServiceImpl extends ServiceImpl(); @Override - public void revoke(Application appParam) throws ApplicationException { - Application application = getById(appParam.getId()); + public void revoke(Long appId) throws ApplicationException { + Application application = getById(appId); ApiAlertException.throwIfNull( - application, - String.format("The application id=%s not found, revoke failed.", appParam.getId())); + application, String.format("The application id=%s not found, revoke failed.", appId)); // 1) delete files that have been published to workspace application.getFsOperator().delete(application.getAppHome()); @@ -206,10 +205,10 @@ public void restart(Application appParam) throws Exception { } @Override - public void forcedStop(Application appParam) { - CompletableFuture startFuture = startFutureMap.remove(appParam.getId()); - CompletableFuture cancelFuture = cancelFutureMap.remove(appParam.getId()); - Application application = this.baseMapper.selectApp(appParam); + public void forcedStop(Long id) { + CompletableFuture startFuture = startFutureMap.remove(id); + CompletableFuture cancelFuture = cancelFutureMap.remove(id); + Application application = this.baseMapper.selectApp(id); if (isKubernetesApp(application)) { KubernetesDeploymentHelper.watchPodTerminatedLog( application.getK8sNamespace(), application.getJobName(), application.getJobId()); @@ -225,7 +224,7 @@ public void forcedStop(Application appParam) { cancelFuture.cancel(true); } if (startFuture == null && cancelFuture == null) { - this.doStopped(appParam); + this.doStopped(id); } } @@ -332,7 +331,7 @@ public void cancel(Application appParam) throws Exception { applicationLogService.save(applicationLog); if (throwable instanceof CancellationException) { - doStopped(application); + doStopped(application.getId()); } else { log.error("stop flink job failed.", throwable); application.setOptionState(OptionStateEnum.NONE.getValue()); @@ -500,7 +499,7 @@ public void start(Application appParam, boolean auto) throws Exception { applicationLog.setSuccess(false); applicationLogService.save(applicationLog); if (throwable instanceof CancellationException) { - doStopped(application); + doStopped(application.getId()); } else { Application app = getById(appParam.getId()); app.setState(FlinkAppStateEnum.FAILED.getValue()); @@ -772,8 +771,8 @@ private Map getProperties(Application application) { return properties; } - private void doStopped(Application appParam) { - Application application = getById(appParam); + private void doStopped(Long id) { + Application application = getById(id); application.setOptionState(OptionStateEnum.NONE.getValue()); application.setState(FlinkAppStateEnum.CANCELED.getValue()); application.setOptionTime(new Date()); @@ -781,9 +780,9 @@ private void doStopped(Application appParam) { savePointService.expire(application.getId()); // re-tracking flink job on kubernetes and logging exception if (isKubernetesApp(application)) { - TrackId id = toTrackId(application); - k8SFlinkTrackMonitor.unWatching(id); - k8SFlinkTrackMonitor.doWatching(id); + TrackId trackId = toTrackId(application); + k8SFlinkTrackMonitor.unWatching(trackId); + k8SFlinkTrackMonitor.doWatching(trackId); } else { FlinkAppHttpWatcher.unWatching(application.getId()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java index 36a1d05cc1..9652666397 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java @@ -325,8 +325,8 @@ public List listHistoryUploadJars() { } @Override - public AppExistsStateEnum checkStart(Application appParam) { - Application application = getById(appParam.getId()); + public AppExistsStateEnum checkStart(Long id) { + Application application = getById(id); if (application == null) { return AppExistsStateEnum.INVALID; } @@ -408,10 +408,10 @@ public String k8sStartLog(Long id, Integer offset, Integer limit) throws Excepti } @Override - public String getYarnName(Application appParam) { + public String getYarnName(String appConfig) { String[] args = new String[2]; args[0] = "--name"; - args[1] = appParam.getConfig(); + args[1] = appConfig; return ParameterCli.read(args); } @@ -479,8 +479,8 @@ private boolean existsByJobName(String jobName) { } @Override - public String readConf(Application appParam) throws IOException { - File file = new File(appParam.getConfig()); + public String readConf(String appConfig) throws IOException { + File file = new File(appConfig); String conf = org.apache.streampark.common.util.FileUtils.readFile(file); return Base64.getEncoder().encodeToString(conf.getBytes()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java index 681d6f7d79..b8e4f1f921 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java @@ -183,9 +183,9 @@ public boolean mapping(Application appParam) { } @Override - public Boolean remove(Application appParam) { + public Boolean remove(Long appId) { - Application application = getById(appParam.getId()); + Application application = getById(appId); // 1) remove flink sql flinkSqlService.removeByAppId(application.getId()); @@ -217,7 +217,7 @@ public Boolean remove(Application appParam) { flinkK8sObserver.unWatchById(application.getId()); } } else { - FlinkAppHttpWatcher.unWatching(appParam.getId()); + FlinkAppHttpWatcher.unWatching(appId); } return true; } @@ -762,10 +762,10 @@ public void clean(Application appParam) { } @Override - public Application getApp(Application appParam) { - Application application = this.baseMapper.selectApp(appParam); - ApplicationConfig config = configService.getEffective(appParam.getId()); - config = config == null ? configService.getLatest(appParam.getId()) : config; + public Application getApp(Long id) { + Application application = this.baseMapper.selectApp(id); + ApplicationConfig config = configService.getEffective(id); + config = config == null ? configService.getLatest(id) : config; if (config != null) { config.setToApplication(application); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java index b711e8d0fe..76220dd566 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java @@ -211,14 +211,14 @@ public IPage getPage(ApplicationConfig config, RestRequest re } @Override - public List list(Application appParam) { + public List list(Long appId) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(ApplicationConfig::getAppId, appParam.getId()) + .eq(ApplicationConfig::getAppId, appId) .orderByDesc(ApplicationConfig::getVersion); List configList = this.baseMapper.selectList(queryWrapper); - fillEffectiveField(appParam.getId(), configList); + fillEffectiveField(appId, configList); return configList; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 4e3875fee9..6587436889 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -342,8 +342,7 @@ public void updateClusterState(Long id, ClusterState state) { } @Override - public void remove(FlinkCluster cluster) { - Long id = cluster.getId(); + public void remove(Long id) { FlinkCluster flinkCluster = getById(id); ApiAlertException.throwIfNull(flinkCluster, "Flink cluster not exist, please check."); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java index 69c175307f..d648a22945 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java @@ -117,14 +117,14 @@ public void setCandidate(CandidateTypeEnum candidateTypeEnum, Long appId, Long s } @Override - public List listFlinkSqlHistory(Application application) { + public List listFlinkSqlHistory(Long appId) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(FlinkSql::getAppId, application.getId()) + .eq(FlinkSql::getAppId, appId) .orderByDesc(FlinkSql::getVersion); List sqlList = this.baseMapper.selectList(queryWrapper); - FlinkSql effective = getEffective(application.getId(), false); + FlinkSql effective = getEffective(appId, false); if (effective != null && !sqlList.isEmpty()) { for (FlinkSql sql : sqlList) { if (sql.getId().equals(effective.getId())) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java index 6185be20aa..29e6e77304 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java @@ -207,8 +207,8 @@ public void updateResource(Resource resource) { } @Override - public void remove(Resource resource) { - Resource findResource = getById(resource.getId()); + public void remove(Long id) { + Resource findResource = getById(id); checkOrElseAlert(findResource); String filePath = @@ -224,7 +224,7 @@ public void remove(Resource resource) { FsOperator.lfs().delete(filePath); - this.removeById(resource); + this.removeById(id); } public List listByTeamId(Long teamId) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/MemberController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/MemberController.java index b9feac9bd8..900245f4e2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/MemberController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/MemberController.java @@ -96,7 +96,7 @@ public RestResponse create(@Valid Member member) { @DeleteMapping("delete") @RequiresPermissions("member:delete") public RestResponse delete(Member member) { - this.memberService.remove(member); + this.memberService.remove(member.getId()); return RestResponse.success(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/MemberService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/MemberService.java index b92294365b..a9936f22a5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/MemberService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/MemberService.java @@ -47,7 +47,7 @@ public interface MemberService extends IService { void createMember(Member member); - void remove(Member member); + void remove(Long id); void updateMember(Member member); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java index c6634732a4..4c231a63a4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java @@ -150,13 +150,11 @@ public void createMember(Member member) { } @Override - public void remove(Member memberArg) { + public void remove(Long id) { Member member = - Optional.ofNullable(this.getById(memberArg.getId())) + Optional.ofNullable(this.getById(id)) .orElseThrow( - () -> - new ApiAlertException( - String.format("The member [id=%s] not found", memberArg.getId()))); + () -> new ApiAlertException(String.format("The member [id=%s] not found", id))); this.removeById(member); userService.clearLastTeam(member.getUserId(), member.getTeamId()); } diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml index 2f968a3ff1..6ae24e8243 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml @@ -177,11 +177,11 @@ - select t.*, p.name as projectName from t_flink_app t left join t_flink_project p on t.project_id = p.id - where t.id = #{app.id} + where t.id = #{id} diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java index 0e7f37732c..10ca766f43 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java @@ -110,7 +110,7 @@ void testStartAppOnRemoteSessionMode() throws Exception { Application appParam = new Application(); appParam.setId(100000L); appParam.setTeamId(100000L); - Application application = applicationManageService.getApp(appParam); + Application application = applicationManageService.getApp(appParam.getId()); application.setFlinkClusterId(1L); application.setSqlId(100000L); application.setVersionId(1L);