diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java index cefe347c7e..decfc770a5 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java @@ -75,4 +75,8 @@ public static boolean isStoppedState(ClusterState state) { public static boolean isLostState(ClusterState state) { return LOST.equals(state); } + + public static boolean isUnknownState(ClusterState state) { + return UNKNOWN.equals(state); + } } diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql index 01d0371918..7783e6c66f 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql @@ -458,6 +458,9 @@ create table `t_flink_cluster` ( `exception` text comment 'exception information', `cluster_state` tinyint default 0 comment 'cluster status (0: created but not started, 1: started, 2: stopped)', `create_time` datetime not null default current_timestamp comment 'create time', + `start_time` datetime default null comment 'start time', + `end_time` datetime default null comment 'end time', + `alert_id` bigint default null comment 'alert id', primary key (`id`,`cluster_name`), unique key `id` (`cluster_id`,`address`,`execution_mode`) ) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql index 1e68b967b7..b1b8d1b1d9 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql @@ -290,7 +290,10 @@ create table "public"."t_flink_cluster" ( "resolve_order" int4, "exception" text collate "pg_catalog"."default", "cluster_state" int2 default 0, - "create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone) + "create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone), + "start_time" timestamp(6), + "end_time" timestamp(6), + "alert_id" int8 ) ; comment on column "public"."t_flink_cluster"."address" is 'url address of cluster'; @@ -309,6 +312,9 @@ comment on column "public"."t_flink_cluster"."k8s_rest_exposed_type" is 'k8s exp comment on column "public"."t_flink_cluster"."k8s_conf" is 'the path where the k 8 s configuration file is located'; comment on column "public"."t_flink_cluster"."exception" is 'exception information'; comment on column "public"."t_flink_cluster"."cluster_state" is 'cluster status (0: create not started, 1: started, 2: stopped)'; +comment on column "public"."t_flink_cluster"."start_time" is 'cluster start time'; +comment on column "public"."t_flink_cluster"."end_time" is 'cluster end time'; +comment on column "public"."t_flink_cluster"."alert_id" is 'alert id'; alter table "public"."t_flink_cluster" add constraint "t_flink_cluster_pkey" primary key ("id", "cluster_name"); create index "id" on "public"."t_flink_cluster" using btree ( "cluster_id" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc nulls last, diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql index 87406e9e83..81db6ac9b4 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql @@ -44,7 +44,10 @@ alter table `t_flink_sql` add column `team_resource` varchar(64) default null; alter table `t_flink_cluster` - add column `job_manager_url` varchar(150) default null comment 'url address of jobmanager' after `address`; + add column `job_manager_url` varchar(150) default null comment 'url address of jobmanager' after `address`, + add column `start_time` datetime default null comment 'start time', + add column `end_time` datetime default null comment 'end time', + add column `alert_id` bigint default null comment 'alert id'; -- menu level 2 insert into `t_menu` values (120400, 120000, 'menu.resource', '/flink/resource', 'flink/resource/View', null, 'apartment', '0', 1, 3, now(), now()); diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql index f2720b43fb..9352cd563d 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql @@ -57,7 +57,10 @@ alter table "public"."t_flink_sql" add column "team_resource" varchar(64) default null; alter table "public"."t_flink_cluster" - add column "job_manager_url" varchar(150) collate "pg_catalog"."default"; + add column "job_manager_url" varchar(150) collate "pg_catalog"."default", + add column "start_time" timestamp(6) collate "pg_catalog"."default", + add column "end_time" timestamp(6) collate "pg_catalog"."default", + add column "alert_id" int8 collate "pg_catalog"."default"; insert into "public"."t_menu" values (120400, 120000, 'menu.resource', '/flink/resource', 'flink/resource/View', null, 'apartment', '0', '1', 3, now(), now()); insert into "public"."t_menu" values (110401, 110400, 'add', null, null, 'token:add', null, '1', '1', null, now(), now()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java index fa6487707b..b9a148349d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java @@ -17,10 +17,12 @@ package org.apache.streampark.console.core.bean; +import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.util.DateUtils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.core.entity.Application; +import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.enums.CheckPointStatus; import org.apache.streampark.console.core.enums.FlinkAppState; @@ -47,61 +49,181 @@ public class AlertTemplate implements Serializable { private Integer restartIndex; private Integer totalRestart; private boolean atAll = false; - - private static AlertTemplate of(Application application) { - long duration; - if (application.getEndTime() == null) { - duration = System.currentTimeMillis() - application.getStartTime().getTime(); - } else { - duration = application.getEndTime().getTime() - application.getStartTime().getTime(); - } - AlertTemplate template = new AlertTemplate(); - template.setJobName(application.getJobName()); - - if (ExecutionMode.isYarnMode(application.getExecutionMode())) { - String format = "%s/proxy/%s/"; - String url = String.format(format, YarnUtils.getRMWebAppURL(false), application.getAppId()); - template.setLink(url); - } else { - template.setLink(null); - } - - template.setStartTime( - DateUtils.format( - application.getStartTime(), DateUtils.fullFormat(), TimeZone.getDefault())); - template.setEndTime( - DateUtils.format( - application.getEndTime() == null ? new Date() : application.getEndTime(), - DateUtils.fullFormat(), - TimeZone.getDefault())); - template.setDuration(DateUtils.toDuration(duration)); - boolean needRestart = application.isNeedRestartOnFailed() && application.getRestartCount() > 0; - template.setRestart(needRestart); - if (needRestart) { - template.setRestartIndex(application.getRestartCount()); - template.setTotalRestart(application.getRestartSize()); - } - return template; - } + private Integer affectedJobs; public static AlertTemplate of(Application application, FlinkAppState appState) { - AlertTemplate template = of(application); - template.setType(1); - template.setTitle(String.format("Notify: %s %s", application.getJobName(), appState.name())); - template.setSubject(String.format("StreamPark Alert: %s %s", template.getJobName(), appState)); - template.setStatus(appState.name()); - return template; + return new AlertTemplateBuilder() + .setDuration(application.getStartTime(), application.getEndTime()) + .setJobName(application.getJobName()) + .setLink(application.getExecutionModeEnum(), application.getAppId()) + .setStartTime(application.getStartTime()) + .setEndTime(application.getEndTime()) + .setRestart(application.isNeedRestartOnFailed(), application.getRestartCount()) + .setRestartIndex(application.getRestartCount()) + .setTotalRestart(application.getRestartSize()) + .setType(1) + .setTitle(String.format("Notify: %s %s", application.getJobName(), appState.name())) + .setSubject(String.format("StreamPark Alert: %s %s", application.getJobName(), appState)) + .setStatus(appState.name()) + .build(); } public static AlertTemplate of(Application application, CheckPointStatus checkPointStatus) { - AlertTemplate template = of(application); - template.setType(2); - template.setCpFailureRateInterval( - DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 * 60)); - template.setCpMaxFailureInterval(application.getCpMaxFailureInterval()); - template.setTitle(String.format("Notify: %s checkpoint FAILED", application.getJobName())); - template.setSubject( - String.format("StreamPark Alert: %s, checkPoint is Failed", template.getJobName())); - return template; + return new AlertTemplateBuilder() + .setDuration(application.getStartTime(), application.getEndTime()) + .setJobName(application.getJobName()) + .setLink(application.getExecutionModeEnum(), application.getAppId()) + .setStartTime(application.getStartTime()) + .setType(2) + .setCpFailureRateInterval( + DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 * 60)) + .setCpMaxFailureInterval(application.getCpMaxFailureInterval()) + .setTitle(String.format("Notify: %s checkpoint FAILED", application.getJobName())) + .setSubject( + String.format("StreamPark Alert: %s, checkPoint is Failed", application.getJobName())) + .build(); + } + + public static AlertTemplate of(FlinkCluster cluster, ClusterState clusterState) { + return new AlertTemplateBuilder() + .setDuration(cluster.getStartTime(), cluster.getEndTime()) + .setJobName(cluster.getClusterName()) + .setLink(ExecutionMode.YARN_SESSION, cluster.getClusterId()) + .setStartTime(cluster.getStartTime()) + .setEndTime(cluster.getEndTime()) + .setType(3) + .setTitle(String.format("Notify: %s %s", cluster.getClusterName(), clusterState.name())) + .setSubject( + String.format("StreamPark Alert: %s %s", cluster.getClusterName(), clusterState)) + .setStatus(clusterState.name()) + .setAffectedJobs(cluster.getJobs()) + .build(); + } + + private static class AlertTemplateBuilder { + private AlertTemplate alertTemplate = new AlertTemplate(); + + public AlertTemplateBuilder setTitle(String title) { + alertTemplate.setTitle(title); + return this; + } + + public AlertTemplateBuilder setSubject(String subject) { + alertTemplate.setSubject(subject); + return this; + } + + public AlertTemplateBuilder setJobName(String jobName) { + alertTemplate.setJobName(jobName); + return this; + } + + public AlertTemplateBuilder setType(Integer type) { + alertTemplate.setType(type); + return this; + } + + public AlertTemplateBuilder setStatus(String status) { + alertTemplate.setStatus(status); + return this; + } + + public AlertTemplateBuilder setStartTime(Date startTime) { + alertTemplate.setStartTime( + DateUtils.format(startTime, DateUtils.fullFormat(), TimeZone.getDefault())); + return this; + } + + public AlertTemplateBuilder setEndTime(Date endTime) { + alertTemplate.setEndTime( + DateUtils.format( + endTime == null ? new Date() : endTime, + DateUtils.fullFormat(), + TimeZone.getDefault())); + return this; + } + + public AlertTemplateBuilder setDuration(String duration) { + alertTemplate.setDuration(duration); + return this; + } + + public AlertTemplateBuilder setDuration(Date start, Date end) { + long duration; + if (start == null && end == null) { + duration = 0L; + } else if (end == null) { + duration = System.currentTimeMillis() - start.getTime(); + } else { + duration = end.getTime() - start.getTime(); + } + alertTemplate.setDuration(DateUtils.toDuration(duration)); + return this; + } + + public AlertTemplateBuilder setLink(String link) { + alertTemplate.setLink(link); + return this; + } + + public AlertTemplateBuilder setLink(ExecutionMode mode, String appId) { + if (ExecutionMode.isYarnMode(mode)) { + String format = "%s/proxy/%s/"; + String url = String.format(format, YarnUtils.getRMWebAppURL(false), appId); + alertTemplate.setLink(url); + } else { + alertTemplate.setLink(null); + } + return this; + } + + public AlertTemplateBuilder setCpFailureRateInterval(String cpFailureRateInterval) { + alertTemplate.setCpFailureRateInterval(cpFailureRateInterval); + return this; + } + + public AlertTemplateBuilder setCpMaxFailureInterval(Integer cpMaxFailureInterval) { + alertTemplate.setCpMaxFailureInterval(cpMaxFailureInterval); + return this; + } + + public AlertTemplateBuilder setRestart(Boolean restart) { + alertTemplate.setRestart(restart); + return this; + } + + public AlertTemplateBuilder setRestart(Boolean needRestartOnFailed, Integer restartCount) { + boolean needRestart = needRestartOnFailed && restartCount > 0; + alertTemplate.setRestart(needRestart); + return this; + } + + public AlertTemplateBuilder setRestartIndex(Integer restartIndex) { + if (alertTemplate.getRestart()) { + alertTemplate.setRestartIndex(restartIndex); + } + return this; + } + + public AlertTemplateBuilder setTotalRestart(Integer totalRestart) { + if (alertTemplate.getRestart()) { + alertTemplate.setTotalRestart(totalRestart); + } + return this; + } + + public AlertTemplateBuilder setAtAll(Boolean atAll) { + alertTemplate.setAtAll(atAll); + return this; + } + + public AlertTemplateBuilder setAffectedJobs(Integer jobs) { + alertTemplate.setAffectedJobs(jobs); + return this; + } + + public AlertTemplate build() { + return this.alertTemplate; + } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index 7bfa52e729..32feec747b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -107,6 +107,14 @@ public class FlinkCluster implements Serializable { private Date createTime = new Date(); + private Date startTime; + + private Date endTime; + + private Integer alertId; + + private transient Integer jobs = 0; + @JsonIgnore public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() { return FlinkK8sRestExposedType.of(this.k8sRestExposedType); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java index dc7c129b64..d1b59fa07f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java @@ -65,4 +65,6 @@ List getRecentK8sClusterId( boolean existsRunningJobByClusterId(@Param("clusterId") Long clusterId); boolean existsJobByClusterId(@Param("clusterId") Long clusterId); + + Integer getAffectedJobsByClusterId(@Param("clusterId") Long clusterId); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java index f78dcf6f24..a64922afae 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java @@ -103,9 +103,11 @@ List getByTeamIdAndExecutionModes( boolean existsRunningJobByClusterId(Long clusterId); - boolean existsJobByClusterId(Long id); + boolean existsJobByClusterId(Long clusterId); - boolean existsJobByFlinkEnvId(Long id); + Integer getAffectedJobsByClusterId(Long clusterId); + + boolean existsJobByFlinkEnvId(Long flinkEnvId); List getRecentK8sNamespace(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java index 4627f08317..d47b1ab624 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java @@ -17,10 +17,12 @@ package org.apache.streampark.console.core.service.alert; +import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.console.base.exception.AlertException; import org.apache.streampark.console.core.bean.AlertConfigWithParams; import org.apache.streampark.console.core.bean.AlertTemplate; import org.apache.streampark.console.core.entity.Application; +import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.enums.CheckPointStatus; import org.apache.streampark.console.core.enums.FlinkAppState; @@ -30,5 +32,7 @@ public interface AlertService { void alert(Application application, FlinkAppState appState); + void alert(FlinkCluster flinkCluster, ClusterState clusterState); + boolean alert(AlertConfigWithParams params, AlertTemplate alertTemplate) throws AlertException; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java index 19a40249ec..05b1b8294b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service.alert.impl; +import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.exception.AlertException; import org.apache.streampark.console.base.util.SpringContextUtils; @@ -24,6 +25,7 @@ import org.apache.streampark.console.core.bean.AlertTemplate; import org.apache.streampark.console.core.entity.AlertConfig; import org.apache.streampark.console.core.entity.Application; +import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.enums.AlertType; import org.apache.streampark.console.core.enums.CheckPointStatus; import org.apache.streampark.console.core.enums.FlinkAppState; @@ -48,17 +50,22 @@ public class AlertServiceImpl implements AlertService { @Override public void alert(Application application, CheckPointStatus checkPointStatus) { AlertTemplate alertTemplate = AlertTemplate.of(application, checkPointStatus); - alert(application, alertTemplate); + alert(application.getAlertId(), alertTemplate); } @Override public void alert(Application application, FlinkAppState appState) { AlertTemplate alertTemplate = AlertTemplate.of(application, appState); - alert(application, alertTemplate); + alert(application.getAlertId(), alertTemplate); } - private void alert(Application application, AlertTemplate alertTemplate) { - Integer alertId = application.getAlertId(); + @Override + public void alert(FlinkCluster flinkCluster, ClusterState clusterState) { + AlertTemplate alertTemplate = AlertTemplate.of(flinkCluster, clusterState); + alert(flinkCluster.getAlertId(), alertTemplate); + } + + private void alert(Integer alertId, AlertTemplate alertTemplate) { if (alertId == null) { return; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 73e76e6e9b..98253bb9f0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -544,6 +544,11 @@ public boolean existsJobByClusterId(Long clusterId) { return baseMapper.existsJobByClusterId(clusterId); } + @Override + public Integer getAffectedJobsByClusterId(Long clusterId) { + return baseMapper.getAffectedJobsByClusterId(clusterId); + } + @Override public boolean existsJobByFlinkEnvId(Long flinkEnvId) { LambdaQueryWrapper lambdaQueryWrapper = diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 31d4ead6fb..769ddf6265 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -142,6 +142,8 @@ public Boolean create(FlinkCluster flinkCluster) { flinkCluster.setCreateTime(new Date()); if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) { flinkCluster.setClusterState(ClusterState.RUNNING.getValue()); + flinkCluster.setStartTime(new Date()); + flinkCluster.setEndTime(null); } else { flinkCluster.setClusterState(ClusterState.CREATED.getValue()); } @@ -172,6 +174,8 @@ public void start(FlinkCluster cluster) { flinkCluster.setClusterId(deployResponse.clusterId()); flinkCluster.setClusterState(ClusterState.RUNNING.getValue()); flinkCluster.setException(null); + flinkCluster.setStartTime(new Date()); + flinkCluster.setEndTime(null); FlinkClusterWatcher.addFlinkCluster(flinkCluster); updateById(flinkCluster); } catch (Exception e) { @@ -222,6 +226,7 @@ public void shutdown(FlinkCluster cluster) { ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response failed"); flinkCluster.setAddress(null); flinkCluster.setClusterState(ClusterState.STOPPED.getValue()); + flinkCluster.setEndTime(new Date()); FlinkClusterWatcher.removeFlinkCluster(flinkCluster); updateById(flinkCluster); } catch (Exception e) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java index 2ecb2c7de8..29131f651c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java @@ -26,7 +26,9 @@ import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.metrics.flink.Overview; import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo; +import org.apache.streampark.console.core.service.ApplicationService; import org.apache.streampark.console.core.service.FlinkClusterService; +import org.apache.streampark.console.core.service.alert.AlertService; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -43,6 +45,7 @@ import javax.annotation.PreDestroy; import java.time.Duration; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -58,6 +61,10 @@ public class FlinkClusterWatcher { @Autowired private FlinkClusterService flinkClusterService; + @Autowired private AlertService alertService; + + @Autowired private ApplicationService applicationService; + private Long lastWatcheringTime = 0L; // Track interval every 30 seconds @@ -107,17 +114,54 @@ private void watcher() { EXECUTOR.execute( () -> { FlinkCluster flinkCluster = entry.getValue(); - Integer clusterExecutionMode = flinkCluster.getExecutionMode(); - if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) { - ClusterState state = getClusterState(flinkCluster); - handleClusterState(flinkCluster, state); - } else { - // TODO: K8s Session status monitoring - } + updateClusterState(flinkCluster); }); } } + private ClusterState updateClusterState(FlinkCluster flinkCluster) { + Integer clusterExecutionMode = flinkCluster.getExecutionMode(); + if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) { + ClusterState state = getClusterState(flinkCluster); + handleClusterState(flinkCluster, state); + return state; + } else { + // TODO: K8s Session status monitoring + return ClusterState.UNKNOWN; + } + } + + public synchronized boolean verifyClusterValidByClusterId(Long clusterId) { + FlinkCluster flinkCluster = flinkClusterService.getById(clusterId); + ClusterState state = ClusterState.of(flinkCluster.getClusterState()); + if (!ClusterState.isRunningState(state)) { + return false; + } + state = updateClusterState(flinkCluster); + if (!ClusterState.isRunningState(state)) { + return false; + } + return true; + } + + public boolean checkAlert(Long clusterId) { + FlinkCluster flinkCluster = flinkClusterService.getById(clusterId); + if (flinkCluster.getAlertId() == null) { + return false; + } + return true; + } + + private void alert(FlinkCluster cluster, ClusterState state) { + if (!checkAlert(cluster.getId())) { + return; + } + cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId())); + cluster.setClusterState(state.getValue()); + cluster.setEndTime(new Date()); + alertService.alert(cluster, state); + } + /** * cluster get state from flink or yarn api * @@ -171,7 +215,7 @@ private ClusterState getClusterStateFromFlinkAPI(FlinkCluster flinkCluster) { */ private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) { if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) { - return ClusterState.STOPPED; + return ClusterState.LOST; } String clusterId = flinkCluster.getClusterId(); if (StringUtils.isEmpty(clusterId)) { @@ -214,13 +258,15 @@ private void handleClusterState(FlinkCluster flinkCluster, ClusterState state) { { updateWrapper .set(FlinkCluster::getAddress, null) - .set(FlinkCluster::getJobManagerUrl, null); + .set(FlinkCluster::getJobManagerUrl, null) + .set(FlinkCluster::getEndTime, new Date()); } // fall through case LOST: case UNKNOWN: { removeFlinkCluster(flinkCluster); + alert(flinkCluster, state); break; } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java index e9b9f2224d..737cc1bc60 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java @@ -80,6 +80,8 @@ public class FlinkRESTAPIWatcher { @Autowired private SavePointService savePointService; + @Autowired private FlinkClusterWatcher flinkClusterWatcher; + // track interval every 5 seconds private static final long WATCHING_INTERVAL = 1000L * 5; // option interval within 10 seconds @@ -228,7 +230,7 @@ private void watch(Long key, Application application) { if (StopFrom.NONE.equals(stopFrom)) { savePointService.expire(application.getId()); application.setState(FlinkAppState.LOST.getValue()); - alertService.alert(application, FlinkAppState.LOST); + alert(application, FlinkAppState.LOST); } else { application.setState(FlinkAppState.CANCELED.getValue()); } @@ -244,7 +246,7 @@ private void watch(Long key, Application application) { doPersistMetrics(application, true); FlinkAppState appState = FlinkAppState.of(application.getState()); if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { - alertService.alert(application, FlinkAppState.of(application.getState())); + alert(application, FlinkAppState.of(application.getState())); if (appState.equals(FlinkAppState.FAILED)) { try { applicationService.start(application, true); @@ -456,7 +458,7 @@ private void handleNotRunState( savePointService.expire(application.getId()); } stopCanceledJob(application.getId()); - alertService.alert(application, FlinkAppState.CANCELED); + alert(application, FlinkAppState.CANCELED); } STOP_FROM_MAP.remove(application.getId()); doPersistMetrics(application, true); @@ -467,7 +469,7 @@ private void handleNotRunState( STOP_FROM_MAP.remove(application.getId()); application.setState(FlinkAppState.FAILED.getValue()); doPersistMetrics(application, true); - alertService.alert(application, FlinkAppState.FAILED); + alert(application, FlinkAppState.FAILED); applicationService.start(application, true); break; case RESTARTING: @@ -544,7 +546,7 @@ and the status is not obtained this time (flink rest server is closed), || flinkAppState.equals(FlinkAppState.LOST) || (flinkAppState.equals(FlinkAppState.CANCELED) && StopFrom.NONE.equals(stopFrom)) || applicationService.checkAlter(application)) { - alertService.alert(application, flinkAppState); + alert(application, flinkAppState); stopCanceledJob(application.getId()); if (flinkAppState.equals(FlinkAppState.FAILED)) { applicationService.start(application, true); @@ -768,4 +770,30 @@ private FlinkCluster getFlinkRemoteCluster(Long clusterId, boolean flush) { interface Callback { R call(T e) throws Exception; } + + /** + * The situation of abnormal operation alarm is as follows: When the job running mode is yarn per + * job or yarn application, when the job is abnormal, an alarm will be triggered directly; The job + * running mode is yarn session or reome: a. If the flink cluster is not configured with an alarm + * information, it will directly alarm when the job is abnormal. b. If the flink cluster is + * configured with alarm information: if the abnormal behavior of the job is caused by an + * abnormality in the flink cluster, block the alarm of the job and wait for the flink cluster + * alarm; If the abnormal behavior of the job is caused by itself and the flink cluster is running + * normally, the job will an alarm + */ + private void alert(Application app, FlinkAppState appState) { + if (ExecutionMode.isYarnPerJobOrAppMode(app.getExecutionModeEnum()) + || !flinkClusterWatcher.checkAlert(app.getFlinkClusterId())) { + alertService.alert(app, appState); + return; + } + boolean isValid = flinkClusterWatcher.verifyClusterValidByClusterId(app.getFlinkClusterId()); + if (isValid) { + log.info( + "application with id {} is yarn session or remote and flink cluster with id {} is alive, application send alert", + app.getId(), + app.getFlinkClusterId()); + alertService.alert(app, appState); + } + } } diff --git a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl index 14e30170b4..dd7331bace 100644 --- a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl +++ b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl @@ -5,7 +5,12 @@ ### **Dear StreamPark user:** > ** Oops! I'm sorry to inform you that something wrong with your app ** +<#if type == 1 || type == 2 > - **Job Name:${jobName}** + +<#if type == 3 > +- **Cluster Name:${jobName}** + <#if type == 1 > - **Job Status:${status}** - **Start Time:${startTime}** @@ -22,6 +27,13 @@ - **Start Time:${startTime}** - **Duration:${duration}** +<#if type == 3 > +- **Cluster Status:${status}** +- **Start Time:${startTime}** +- **End Time:${endTime}** +- **Duration:${duration}** +- **Affected Jobs:${affectedJobs}** + > Best Wishes! > diff --git a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl index 3cb0a93232..a3d6228580 100644 --- a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl +++ b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl @@ -967,6 +967,60 @@ + <#if mail.type == 3 > + + + Cluster Name + + + ${mail.jobName} + + + + + Cluster Status + + + ${mail.status} + + + + + Start Time + + + ${mail.startTime} + + + + + End Time + + + ${mail.endTime} + + + + + + Duration + + + ${mail.duration} + + + + + + Affected Jobs + + + ${mail.affectedJobs} + + + + +
Best Wishes! diff --git a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl index 2f7171fd7f..a33b5811d2 100644 --- a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl +++ b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl @@ -27,6 +27,7 @@ }, { "fields": [ +<#if type == 1 || type == 2> { "is_short": false, "text": { @@ -34,6 +35,16 @@ "tag": "lark_md" } }, + +<#if type == 3> + { + "is_short": false, + "text": { + "content": "**Cluster Name:${jobName}**", + "tag": "lark_md" + } + }, + <#if type == 1 > { "is_short": false, @@ -109,6 +120,43 @@ "tag": "lark_md" } } + +<#if type == 3 > + { + "is_short": false, + "text": { + "content": "**Cluster Status:${status}**", + "tag": "lark_md" + } + }, + { + "is_short": true, + "text": { + "content": "**Start Time:${startTime}**", + "tag": "lark_md" + } + }, + { + "is_short": false, + "text": { + "content": "**End Time:${endTime}**", + "tag": "lark_md" + } + }, + { + "is_short": true, + "text": { + "content": "**Duration:${duration}**", + "tag": "lark_md" + } + }, + { + "is_short": false, + "text": { + "content": "**Affected Jobs:${affectedJobs}**", + "tag": "lark_md" + } + } ], "tag": "div" diff --git a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl index 7d863c15d6..d72a468722 100644 --- a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl +++ b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl @@ -5,8 +5,12 @@ ### **Dear StreamPark user:** `Oops! I'm sorry to inform you that something wrong with your app` - +<#if type == 1 || type ==2 > - **Job Name:${jobName}** + +<#if type == 3 > +- **Cluster Name:${jobName}** + <#if type == 1 > - **Job Status:${status}** - **Start Time:${startTime}** @@ -23,6 +27,13 @@ - **Start Time:${startTime}** - **Duration:${duration}** +<#if type == 3 > +- **Cluster Status:${status}** +- **Start Time:${startTime}** +- **End Time:${endTime}** +- **Duration:${duration}** +- **Affected Jobs:${affectedJobs}** + > Best Wishes! > Apache StreamPark diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index fd625c21e5..fcb05c25e8 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -411,6 +411,9 @@ create table if not exists `t_flink_cluster` ( `exception` text comment 'exception information', `cluster_state` tinyint default 0 comment 'cluster status (0: created but not started, 1: started, 2: stopped)', `create_time` datetime not null default current_timestamp comment 'create time', + `start_time` datetime default null comment 'start time', + `end_time` datetime default null comment 'end time', + `alert_id` bigint default null comment 'alert id', primary key(`id`,`cluster_name`), unique (`cluster_id`,`address`,`execution_mode`) ); diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml index d8565cc768..cc01733879 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml @@ -133,6 +133,14 @@ limit 1 + + diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml index 91e7918477..769fe521c3 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml @@ -40,6 +40,9 @@ + + +