Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-2423][Feature][WIP] flink cluster failure alarm&failover #2809

Merged
merged 5 commits into from
Jul 1, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ public static boolean isStoppedState(ClusterState state) {
public static boolean isLostState(ClusterState state) {
return LOST.equals(state);
}

public static boolean isUnknownState(ClusterState state) {
return UNKNOWN.equals(state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ create table `t_flink_cluster` (
`exception` text comment 'exception information',
`cluster_state` tinyint default 0 comment 'cluster status (0: created but not started, 1: started, 2: stopped)',
`create_time` datetime not null default current_timestamp comment 'create time',
`start_time` datetime default null comment 'start time',
`end_time` datetime default null comment 'end time',
`alert_id` bigint default null comment 'alert id',
primary key (`id`,`cluster_name`),
unique key `id` (`cluster_id`,`address`,`execution_mode`)
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,10 @@ create table "public"."t_flink_cluster" (
"resolve_order" int4,
"exception" text collate "pg_catalog"."default",
"cluster_state" int2 default 0,
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"start_time" timestamp(6),
"end_time" timestamp(6),
"alert_id" int8
)
;
comment on column "public"."t_flink_cluster"."address" is 'url address of cluster';
Expand All @@ -309,6 +312,9 @@ comment on column "public"."t_flink_cluster"."k8s_rest_exposed_type" is 'k8s exp
comment on column "public"."t_flink_cluster"."k8s_conf" is 'the path where the k 8 s configuration file is located';
comment on column "public"."t_flink_cluster"."exception" is 'exception information';
comment on column "public"."t_flink_cluster"."cluster_state" is 'cluster status (0: create not started, 1: started, 2: stopped)';
comment on column "public"."t_flink_cluster"."start_time" is 'cluster start time';
comment on column "public"."t_flink_cluster"."end_time" is 'cluster end time';
comment on column "public"."t_flink_cluster"."alert_id" is 'alert id';
alter table "public"."t_flink_cluster" add constraint "t_flink_cluster_pkey" primary key ("id", "cluster_name");
create index "id" on "public"."t_flink_cluster" using btree (
"cluster_id" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc nulls last,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ alter table `t_flink_sql`
add column `team_resource` varchar(64) default null;

alter table `t_flink_cluster`
add column `job_manager_url` varchar(150) default null comment 'url address of jobmanager' after `address`;
add column `job_manager_url` varchar(150) default null comment 'url address of jobmanager' after `address`,
add column `start_time` datetime default null comment 'start time',
add column `end_time` datetime default null comment 'end time',
add column `alert_id` bigint default null comment 'alert id';
Comment on lines +48 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to split it into two statements based on PR granularity here?
CC @wolfboys

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that these three statements are all related to the granularity of the flink cluster alarm, so I do not think it is necessary to divide them into multiple statements. If I am wrong, please correct me. CC @wolfboys


-- menu level 2
insert into `t_menu` values (120400, 120000, 'menu.resource', '/flink/resource', 'flink/resource/View', null, 'apartment', '0', 1, 3, now(), now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ alter table "public"."t_flink_sql"
add column "team_resource" varchar(64) default null;

alter table "public"."t_flink_cluster"
add column "job_manager_url" varchar(150) collate "pg_catalog"."default";
add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
add column "start_time" timestamp(6) collate "pg_catalog"."default",
add column "end_time" timestamp(6) collate "pg_catalog"."default",
add column "alert_id" int8 collate "pg_catalog"."default";
Comment on lines +61 to +63
Copy link
Contributor

@RocMarshal RocMarshal Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to split it into two statements based on PR granularity here?
CC @wolfboys


insert into "public"."t_menu" values (120400, 120000, 'menu.resource', '/flink/resource', 'flink/resource/View', null, 'apartment', '0', '1', 3, now(), now());
insert into "public"."t_menu" values (110401, 110400, 'add', null, null, 'token:add', null, '1', '1', null, now(), now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.streampark.console.core.bean;

import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.DateUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.CheckPointStatus;
import org.apache.streampark.console.core.enums.FlinkAppState;

Expand All @@ -47,61 +49,181 @@ public class AlertTemplate implements Serializable {
private Integer restartIndex;
private Integer totalRestart;
private boolean atAll = false;

private static AlertTemplate of(Application application) {
long duration;
if (application.getEndTime() == null) {
duration = System.currentTimeMillis() - application.getStartTime().getTime();
} else {
duration = application.getEndTime().getTime() - application.getStartTime().getTime();
}
AlertTemplate template = new AlertTemplate();
template.setJobName(application.getJobName());

if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
String format = "%s/proxy/%s/";
String url = String.format(format, YarnUtils.getRMWebAppURL(false), application.getAppId());
template.setLink(url);
} else {
template.setLink(null);
}

template.setStartTime(
DateUtils.format(
application.getStartTime(), DateUtils.fullFormat(), TimeZone.getDefault()));
template.setEndTime(
DateUtils.format(
application.getEndTime() == null ? new Date() : application.getEndTime(),
DateUtils.fullFormat(),
TimeZone.getDefault()));
template.setDuration(DateUtils.toDuration(duration));
boolean needRestart = application.isNeedRestartOnFailed() && application.getRestartCount() > 0;
template.setRestart(needRestart);
if (needRestart) {
template.setRestartIndex(application.getRestartCount());
template.setTotalRestart(application.getRestartSize());
}
return template;
}
private Integer affectedJobs;

public static AlertTemplate of(Application application, FlinkAppState appState) {
AlertTemplate template = of(application);
template.setType(1);
template.setTitle(String.format("Notify: %s %s", application.getJobName(), appState.name()));
template.setSubject(String.format("StreamPark Alert: %s %s", template.getJobName(), appState));
template.setStatus(appState.name());
return template;
return new AlertTemplateBuilder()
.setDuration(application.getStartTime(), application.getEndTime())
.setJobName(application.getJobName())
.setLink(application.getExecutionModeEnum(), application.getAppId())
.setStartTime(application.getStartTime())
.setEndTime(application.getEndTime())
.setRestart(application.isNeedRestartOnFailed(), application.getRestartCount())
.setRestartIndex(application.getRestartCount())
.setTotalRestart(application.getRestartSize())
.setType(1)
.setTitle(String.format("Notify: %s %s", application.getJobName(), appState.name()))
.setSubject(String.format("StreamPark Alert: %s %s", application.getJobName(), appState))
.setStatus(appState.name())
.build();
}

public static AlertTemplate of(Application application, CheckPointStatus checkPointStatus) {
AlertTemplate template = of(application);
template.setType(2);
template.setCpFailureRateInterval(
DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 * 60));
template.setCpMaxFailureInterval(application.getCpMaxFailureInterval());
template.setTitle(String.format("Notify: %s checkpoint FAILED", application.getJobName()));
template.setSubject(
String.format("StreamPark Alert: %s, checkPoint is Failed", template.getJobName()));
return template;
return new AlertTemplateBuilder()
.setDuration(application.getStartTime(), application.getEndTime())
.setJobName(application.getJobName())
.setLink(application.getExecutionModeEnum(), application.getAppId())
.setStartTime(application.getStartTime())
.setType(2)
.setCpFailureRateInterval(
DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 * 60))
.setCpMaxFailureInterval(application.getCpMaxFailureInterval())
.setTitle(String.format("Notify: %s checkpoint FAILED", application.getJobName()))
.setSubject(
String.format("StreamPark Alert: %s, checkPoint is Failed", application.getJobName()))
.build();
}

public static AlertTemplate of(FlinkCluster cluster, ClusterState clusterState) {
return new AlertTemplateBuilder()
.setDuration(cluster.getStartTime(), cluster.getEndTime())
.setJobName(cluster.getClusterName())
.setLink(ExecutionMode.YARN_SESSION, cluster.getClusterId())
.setStartTime(cluster.getStartTime())
.setEndTime(cluster.getEndTime())
.setType(3)
.setTitle(String.format("Notify: %s %s", cluster.getClusterName(), clusterState.name()))
.setSubject(
String.format("StreamPark Alert: %s %s", cluster.getClusterName(), clusterState))
.setStatus(clusterState.name())
.setAffectedJobs(cluster.getJobs())
.build();
}

private static class AlertTemplateBuilder {
private AlertTemplate alertTemplate = new AlertTemplate();

public AlertTemplateBuilder setTitle(String title) {
alertTemplate.setTitle(title);
return this;
}

public AlertTemplateBuilder setSubject(String subject) {
alertTemplate.setSubject(subject);
return this;
}

public AlertTemplateBuilder setJobName(String jobName) {
alertTemplate.setJobName(jobName);
return this;
}

public AlertTemplateBuilder setType(Integer type) {
alertTemplate.setType(type);
return this;
}

public AlertTemplateBuilder setStatus(String status) {
alertTemplate.setStatus(status);
return this;
}

public AlertTemplateBuilder setStartTime(Date startTime) {
alertTemplate.setStartTime(
DateUtils.format(startTime, DateUtils.fullFormat(), TimeZone.getDefault()));
return this;
}

public AlertTemplateBuilder setEndTime(Date endTime) {
alertTemplate.setEndTime(
DateUtils.format(
endTime == null ? new Date() : endTime,
DateUtils.fullFormat(),
TimeZone.getDefault()));
return this;
}

public AlertTemplateBuilder setDuration(String duration) {
alertTemplate.setDuration(duration);
return this;
}

public AlertTemplateBuilder setDuration(Date start, Date end) {
long duration;
if (start == null && end == null) {
duration = 0L;
} else if (end == null) {
duration = System.currentTimeMillis() - start.getTime();
} else {
duration = end.getTime() - start.getTime();
}
alertTemplate.setDuration(DateUtils.toDuration(duration));
return this;
}

public AlertTemplateBuilder setLink(String link) {
alertTemplate.setLink(link);
return this;
}

public AlertTemplateBuilder setLink(ExecutionMode mode, String appId) {
if (ExecutionMode.isYarnMode(mode)) {
String format = "%s/proxy/%s/";
String url = String.format(format, YarnUtils.getRMWebAppURL(false), appId);
alertTemplate.setLink(url);
} else {
alertTemplate.setLink(null);
}
return this;
}

public AlertTemplateBuilder setCpFailureRateInterval(String cpFailureRateInterval) {
alertTemplate.setCpFailureRateInterval(cpFailureRateInterval);
return this;
}

public AlertTemplateBuilder setCpMaxFailureInterval(Integer cpMaxFailureInterval) {
alertTemplate.setCpMaxFailureInterval(cpMaxFailureInterval);
return this;
}

public AlertTemplateBuilder setRestart(Boolean restart) {
alertTemplate.setRestart(restart);
return this;
}

public AlertTemplateBuilder setRestart(Boolean needRestartOnFailed, Integer restartCount) {
boolean needRestart = needRestartOnFailed && restartCount > 0;
alertTemplate.setRestart(needRestart);
return this;
}

public AlertTemplateBuilder setRestartIndex(Integer restartIndex) {
if (alertTemplate.getRestart()) {
alertTemplate.setRestartIndex(restartIndex);
}
return this;
}

public AlertTemplateBuilder setTotalRestart(Integer totalRestart) {
if (alertTemplate.getRestart()) {
alertTemplate.setTotalRestart(totalRestart);
}
return this;
}

public AlertTemplateBuilder setAtAll(Boolean atAll) {
alertTemplate.setAtAll(atAll);
return this;
}

public AlertTemplateBuilder setAffectedJobs(Integer jobs) {
alertTemplate.setAffectedJobs(jobs);
return this;
}

public AlertTemplate build() {
return this.alertTemplate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ public class FlinkCluster implements Serializable {

private Date createTime = new Date();

private Date startTime;

private Date endTime;

private Integer alertId;

private transient Integer jobs = 0;

@JsonIgnore
public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() {
return FlinkK8sRestExposedType.of(this.k8sRestExposedType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ List<String> getRecentK8sClusterId(
boolean existsRunningJobByClusterId(@Param("clusterId") Long clusterId);

boolean existsJobByClusterId(@Param("clusterId") Long clusterId);

Integer getJobByClusterId(@Param("clusterId") Long clusterId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ List<Application> getByTeamIdAndExecutionModes(

boolean existsRunningJobByClusterId(Long clusterId);

boolean existsJobByClusterId(Long id);
boolean existsJobByClusterId(Long clusterId);

boolean existsJobByFlinkEnvId(Long id);
Integer getJobByClusterId(Long clusterId);

boolean existsJobByFlinkEnvId(Long flinkEnvId);

List<String> getRecentK8sNamespace();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.streampark.console.core.service.alert;

import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.console.base.exception.AlertException;
import org.apache.streampark.console.core.bean.AlertConfigWithParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.CheckPointStatus;
import org.apache.streampark.console.core.enums.FlinkAppState;

Expand All @@ -30,5 +32,7 @@ public interface AlertService {

void alert(Application application, FlinkAppState appState);

void alert(FlinkCluster flinkCluster, ClusterState clusterState);

boolean alert(AlertConfigWithParams params, AlertTemplate alertTemplate) throws AlertException;
}
Loading