Skip to content

Commit

Permalink
Merge branch 'dev' into streampark-console-Code-Comments-Rule
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengkezhou1 authored Jan 23, 2024
2 parents 815c39c + c89ceed commit 1f3bc12
Show file tree
Hide file tree
Showing 28 changed files with 86 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public RestResponse getAlertConfig(@RequestBody AlertConfigParams params) {
@PostMapping(value = "/list")
public RestResponse alertConfigsPaginationList(
@RequestBody AlertConfigParams params, RestRequest request) {
IPage<AlertConfigParams> page = alertConfigService.page(params, request);
IPage<AlertConfigParams> page = alertConfigService.page(params.getUserId(), request);
return RestResponse.success(page);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class ApplicationController {
@PostMapping("get")
@RequiresPermissions("app:detail")
public RestResponse get(Application app) {
Application application = applicationManageService.getApp(app);
Application application = applicationManageService.getApp(app.getId());
return RestResponse.success(application);
}

Expand Down Expand Up @@ -167,15 +167,15 @@ public RestResponse mapping(Application app) {
@PostMapping("revoke")
@RequiresPermissions("app:release")
public RestResponse revoke(Application app) {
applicationActionService.revoke(app);
applicationActionService.revoke(app.getId());
return RestResponse.success();
}

@PermissionAction(id = "#app.id", type = PermissionTypeEnum.APP)
@PostMapping(value = "check_start")
@RequiresPermissions("app:start")
public RestResponse checkStart(Application app) {
AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app);
AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app.getId());
return RestResponse.success(stateEnum.get());
}

Expand Down Expand Up @@ -286,7 +286,7 @@ public RestResponse clean(Application app) {
@PostMapping("forcedStop")
@RequiresPermissions("app:cancel")
public RestResponse forcedStop(Application app) {
applicationActionService.forcedStop(app);
applicationActionService.forcedStop(app.getId());
return RestResponse.success();
}

Expand All @@ -299,7 +299,7 @@ public RestResponse yarn() {
@Operation(summary = "Get application on yarn name")
@PostMapping("name")
public RestResponse yarnName(Application app) {
String yarnName = applicationInfoService.getYarnName(app);
String yarnName = applicationInfoService.getYarnName(app.getConfig());
return RestResponse.success(yarnName);
}

Expand All @@ -313,7 +313,7 @@ public RestResponse checkName(Application app) {
@Operation(summary = "Get application conf")
@PostMapping("readConf")
public RestResponse readConf(Application app) throws IOException {
String config = applicationInfoService.readConf(app);
String config = applicationInfoService.readConf(app.getConfig());
return RestResponse.success(config);
}

Expand Down Expand Up @@ -352,7 +352,7 @@ public RestResponse deleteOperationLog(Long id) {
@PostMapping("delete")
@RequiresPermissions("app:delete")
public RestResponse delete(Application app) throws InternalException {
Boolean deleted = applicationManageService.remove(app);
Boolean deleted = applicationManageService.remove(app.getId());
return RestResponse.success(deleted);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public RestResponse list(ApplicationConfig config, RestRequest request) {
@Operation(summary = "List application config histories")
@PostMapping("history")
public RestResponse history(Application application) {
List<ApplicationConfig> history = applicationConfigService.list(application);
List<ApplicationConfig> history = applicationConfigService.list(application.getId());
return RestResponse.success(history);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public RestResponse shutdown(FlinkCluster cluster) {
@Operation(summary = "Delete flink cluster")
@PostMapping("delete")
public RestResponse delete(FlinkCluster cluster) {
flinkClusterService.remove(cluster);
flinkClusterService.remove(cluster.getId());
return RestResponse.success();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public RestResponse get(String id) throws InternalException {
@Operation(summary = "List the applications sql histories")
@PostMapping("history")
public RestResponse sqlhistory(Application application) {
List<FlinkSql> sqlList = flinkSqlService.listFlinkSqlHistory(application);
List<FlinkSql> sqlList = flinkSqlService.listFlinkSqlHistory(application.getId());
return RestResponse.success(sqlList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public RestResponse updateResource(@Valid Resource resource) {
@DeleteMapping("delete")
@RequiresPermissions("resource:delete")
public RestResponse deleteResource(@Valid Resource resource) {
this.resourceService.remove(resource);
this.resourceService.remove(resource.getId());
return RestResponse.success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface ApplicationMapper extends BaseMapper<Application> {

IPage<Application> selectPage(Page<Application> page, @Param("app") Application application);

Application selectApp(@Param("app") Application application);
Application selectApp(@Param("id") Long id);

void persistMetrics(@Param("app") Application application);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ public interface ApplicationConfigService extends IService<ApplicationConfig> {
/**
* Retrieves the history of application configurations for a given application.
*
* @param appParam The application for which to retrieve the history.
* @param appId The application's id for which to retrieve the history.
* @return The list of application configurations representing the history.
*/
List<ApplicationConfig> list(Application appParam);
List<ApplicationConfig> list(Long appId);

/**
* Reads a template from a file or a database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
/**
* Remove flink cluster
*
* @param flinkCluster FlinkCluster to be remove
* @param id FlinkCluster id whitch to be removed
*/
void remove(FlinkCluster flinkCluster);
void remove(Long id);

/**
* Update flink cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ public interface FlinkSqlService extends IService<FlinkSql> {
/**
* Get all historical SQL through Application
*
* @param application Application
* @param appId Application id
* @return list of History FLinkSQL
*/
List<FlinkSql> listFlinkSqlHistory(Application application);
List<FlinkSql> listFlinkSqlHistory(Long appId);

/**
* Get FlinkSQL by Application id and Candidate Type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public interface ResourceService extends IService<Resource> {
/**
* delete resource
*
* @param resource
* @param id
*/
void remove(Resource resource);
void remove(Long id);

/**
* Get resource through team id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ public interface AlertConfigService extends IService<AlertConfig> {
/**
* Retrieves a page of {@link AlertConfigParams} objects based on the provided parameters.
*
* @param params The {@link AlertConfigParams} object containing the search criteria.
* @param params The {@link userId} object containing the search criteria.
* @param request The {@link RestRequest} object used for pagination and sorting.
* @return An {@link IPage} containing the retrieved {@link AlertConfigParams} objects.
*/
IPage<AlertConfigParams> page(AlertConfigParams params, RestRequest request);
IPage<AlertConfigParams> page(Long userId, RestRequest request);

/**
* check whether the relevant alarm configuration exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public class AlertConfigServiceImpl extends ServiceImpl<AlertConfigMapper, Alert
@Autowired private ApplicationInfoService applicationInfoService;

@Override
public IPage<AlertConfigParams> page(AlertConfigParams params, RestRequest request) {
public IPage<AlertConfigParams> page(Long userId, RestRequest request) {
// build query conditions
LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(params.getUserId() != null, AlertConfig::getUserId, params.getUserId());
wrapper.eq(userId != null, AlertConfig::getUserId, userId);

Page<AlertConfig> page = MybatisPager.getPage(request);
IPage<AlertConfig> resultPage = getBaseMapper().selectPage(page, wrapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public interface ApplicationActionService extends IService<Application> {
/**
* Revokes access for the given application.
*
* @param appParam The application for which access needs to be revoked.
* @param appId The application's id for which access needs to be revoked.
* @throws ApplicationException if an error occurs while revoking access.
*/
void revoke(Application appParam) throws ApplicationException;
void revoke(Long appId) throws ApplicationException;

/**
* Cancels the given application. Throws an exception if cancellation fails.
Expand All @@ -64,7 +64,7 @@ public interface ApplicationActionService extends IService<Application> {
/**
* Forces the given application to stop.
*
* @param appParam the application to be stopped
* @param id the application's id which need to be stopped
*/
void forcedStop(Application appParam);
void forcedStop(Long id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ public interface ApplicationInfoService extends IService<Application> {
/**
* Gets the YARN name for the given application.
*
* @param appParam The application for which to retrieve the YARN name.
* @param appConfig The application's config for which to retrieve the YARN name.
* @return The YARN name of the application as a String.
*/
String getYarnName(Application appParam);
String getYarnName(String appConfig);

/**
* Checks if the given application exists in the system.
Expand All @@ -139,11 +139,11 @@ public interface ApplicationInfoService extends IService<Application> {
/**
* Reads the configuration for the given application and returns it as a String.
*
* @param appParam The application for which the configuration needs to be read.
* @param appConfig The application's config for which the configuration needs to be read.
* @return The configuration for the given application as a String.
* @throws IOException If an I/O error occurs while reading the configuration.
*/
String readConf(Application appParam) throws IOException;
String readConf(String appConfig) throws IOException;

/**
* Retrieves the main configuration value for the given Application.
Expand Down Expand Up @@ -226,10 +226,10 @@ public interface ApplicationInfoService extends IService<Application> {
/**
* check application before start
*
* @param appParam
* @param id the application's id which need to check before start.
* @return org.apache.streampark.console.core.enums.AppExistsStateEnum
*/
AppExistsStateEnum checkStart(Application appParam);
AppExistsStateEnum checkStart(Long id);

/**
* @param appName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,19 @@ public interface ApplicationManageService extends IService<Application> {
/**
* Deletes the given Application from the system.
*
* @param appParam The Application to be deleted.
* @param appId The Application's id which need to be deleted.
* @return True if the deletion was successful, false otherwise.
*/
Boolean remove(Application appParam);
Boolean remove(Long appId);

/**
* Retrieves the Application with the specified details from the system.
*
* @param appParam The Application object containing the details of the Application to retrieve.
* @param id The Application object's id.
* @return The Application object that matches the specified details, or null if no matching
* Application is found.
*/
Application getApp(Application appParam);
Application getApp(Long id);

/**
* Updates the release of the given application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,10 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper,
new ConcurrentHashMap<>();

@Override
public void revoke(Application appParam) throws ApplicationException {
Application application = getById(appParam.getId());
public void revoke(Long appId) throws ApplicationException {
Application application = getById(appId);
ApiAlertException.throwIfNull(
application,
String.format("The application id=%s not found, revoke failed.", appParam.getId()));
application, String.format("The application id=%s not found, revoke failed.", appId));

// 1) delete files that have been published to workspace
application.getFsOperator().delete(application.getAppHome());
Expand Down Expand Up @@ -206,10 +205,10 @@ public void restart(Application appParam) throws Exception {
}

@Override
public void forcedStop(Application appParam) {
CompletableFuture<SubmitResponse> startFuture = startFutureMap.remove(appParam.getId());
CompletableFuture<CancelResponse> cancelFuture = cancelFutureMap.remove(appParam.getId());
Application application = this.baseMapper.selectApp(appParam);
public void forcedStop(Long id) {
CompletableFuture<SubmitResponse> startFuture = startFutureMap.remove(id);
CompletableFuture<CancelResponse> cancelFuture = cancelFutureMap.remove(id);
Application application = this.baseMapper.selectApp(id);
if (isKubernetesApp(application)) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(), application.getJobId());
Expand All @@ -225,7 +224,7 @@ public void forcedStop(Application appParam) {
cancelFuture.cancel(true);
}
if (startFuture == null && cancelFuture == null) {
this.doStopped(appParam);
this.doStopped(id);
}
}

Expand Down Expand Up @@ -332,7 +331,7 @@ public void cancel(Application appParam) throws Exception {
applicationLogService.save(applicationLog);

if (throwable instanceof CancellationException) {
doStopped(application);
doStopped(application.getId());
} else {
log.error("stop flink job failed.", throwable);
application.setOptionState(OptionStateEnum.NONE.getValue());
Expand Down Expand Up @@ -500,7 +499,7 @@ public void start(Application appParam, boolean auto) throws Exception {
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
if (throwable instanceof CancellationException) {
doStopped(application);
doStopped(application.getId());
} else {
Application app = getById(appParam.getId());
app.setState(FlinkAppStateEnum.FAILED.getValue());
Expand Down Expand Up @@ -772,18 +771,18 @@ private Map<String, Object> getProperties(Application application) {
return properties;
}

private void doStopped(Application appParam) {
Application application = getById(appParam);
private void doStopped(Long id) {
Application application = getById(id);
application.setOptionState(OptionStateEnum.NONE.getValue());
application.setState(FlinkAppStateEnum.CANCELED.getValue());
application.setOptionTime(new Date());
updateById(application);
savePointService.expire(application.getId());
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
TrackId trackId = toTrackId(application);
k8SFlinkTrackMonitor.unWatching(trackId);
k8SFlinkTrackMonitor.doWatching(trackId);
} else {
FlinkAppHttpWatcher.unWatching(application.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ public List<String> listHistoryUploadJars() {
}

@Override
public AppExistsStateEnum checkStart(Application appParam) {
Application application = getById(appParam.getId());
public AppExistsStateEnum checkStart(Long id) {
Application application = getById(id);
if (application == null) {
return AppExistsStateEnum.INVALID;
}
Expand Down Expand Up @@ -408,10 +408,10 @@ public String k8sStartLog(Long id, Integer offset, Integer limit) throws Excepti
}

@Override
public String getYarnName(Application appParam) {
public String getYarnName(String appConfig) {
String[] args = new String[2];
args[0] = "--name";
args[1] = appParam.getConfig();
args[1] = appConfig;
return ParameterCli.read(args);
}

Expand Down Expand Up @@ -479,8 +479,8 @@ private boolean existsByJobName(String jobName) {
}

@Override
public String readConf(Application appParam) throws IOException {
File file = new File(appParam.getConfig());
public String readConf(String appConfig) throws IOException {
File file = new File(appConfig);
String conf = org.apache.streampark.common.util.FileUtils.readFile(file);
return Base64.getEncoder().encodeToString(conf.getBytes());
}
Expand Down
Loading

0 comments on commit 1f3bc12

Please sign in to comment.