From 34990ba6ded8a2692fe65638f07df1dea3dae995 Mon Sep 17 00:00:00 2001 From: benjobs Date: Sun, 28 Jan 2024 12:21:52 +0800 Subject: [PATCH 1/2] [Improve] FE i18n improvement --- .../service/impl/ApplicationServiceImpl.java | 1 + .../task/FlinkK8sChangeEventListener.java | 5 +- .../src/locales/lang/en/flink/app.ts | 6 ++ .../src/locales/lang/zh-CN/flink/app.ts | 5 + .../app/components/AppDetail/DetailTab.vue | 96 +++++++++---------- .../AppView/StartApplicationModal.vue | 23 ++--- .../watcher/FlinkMetricsWatcher.scala | 39 ++++---- 7 files changed, 88 insertions(+), 87 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index f15a48d22d..6c1fc29daa 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -247,6 +247,7 @@ public Map dashboard(Long teamId) { if (!teamId.equals(app.getTeamId())) { continue; } + // 1) only yarn-application, yarn-perjob mode if (app.getJmMemory() != null) { totalJmMemory += app.getJmMemory(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java index a9e9962f98..9cc0777182 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java @@ -84,15 +84,16 @@ public void subscribeJobStatusChange(FlinkJobStatusChangeEvent event) { TrackId trackId = event.trackId(); // get pre application record Application app = applicationService.getById(trackId.appId()); - if (app == null) { + if (app == null || FlinkAppState.isEndState(app.getState())) { return; } + // update application record setByJobStatusCV(app, jobStatus); applicationService.persistMetrics(app); - // email alerts when necessary FlinkAppState state = FlinkAppState.of(app.getState()); + // email alerts when necessary if (FlinkAppState.FAILED.equals(state) || FlinkAppState.LOST.equals(state) || FlinkAppState.RESTARTING.equals(state) diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts index 366fef9d76..cd5a17f8e5 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts @@ -166,6 +166,12 @@ export default { start: 'Start Application', stop: 'Stop application', savepoint: 'Trigger Savepoint', + fromSavepoint: 'From savepoint', + savepointTip: 'Restore the job from savepoint or latest checkpoint', + savepointInput: + 'Select or manually specify the savepoint/checkpoint path, Same as:-allowNonRestoredState(-n)', + ignoreRestored: 'Ignore failed', + ignoreRestoredTip: 'ignore savepoint then cannot be restored,', recheck: 'the associated project has changed and this job need to be rechecked', changed: 'the application has changed.', }, diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts index 863a909b70..d1f6b79f8a 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts @@ -158,6 +158,11 @@ export default { savepoint: '触发 Savepoint', recheck: '关联的项目已更改,需要重新检查此作业', changed: '应用程序已更改。', + fromSavepoint: 'Savepoint 恢复', + savepointTip: '作业从 savepoint 或 checkpoint 恢复状态', + savepointInput: '选择或者手动指定 savepoint/checkpoint 路径', + ignoreRestored: '跳过恢复失败', + ignoreRestoredTip: '当状态恢复失败时跳过错误,作业继续运行, 同参数:-allowNonRestoredState(-n)', }, pod: { choice: '选择', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue index 0d7fa96a20..dc8853e5c2 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue @@ -246,13 +246,13 @@ /* delete configuration */ async function handleDeleteConf(record: Recordable) { await fetchRemoveConf({ id: record.id }); - reloadConf(); + await reloadConf(); } /* delete flink sql */ async function handleDeleteFlinkSql(record: Recordable) { await fetchRemoveFlinkSql({ id: record.id }); - reloadFlinkSql(); + await reloadFlinkSql(); } function handleCompare(record: Recordable) { @@ -345,17 +345,17 @@ /* delete savePoint */ async function handleDeleteSavePoint(record: Recordable) { await fetchRemoveSavePoint({ id: record.id }); - reloadSavePoint(); + await reloadSavePoint(); } async function handleDeleteBackup(record: Recordable) { await fetchRemoveBackup(record.id); - reloadBackup(); + await reloadBackup(); } async function handleDeleteOperationLog(record: Recordable) { await fetchDeleteOperationLog(record.id); - reloadOperationLog(); + await reloadOperationLog(); } /* copy path */ @@ -377,8 +377,43 @@ - - - - - diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue index 205b6173f7..88632acbbf 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue @@ -59,47 +59,36 @@ schemas: [ { field: 'startSavePointed', - label: 'from savepoint', + label: t('flink.app.view.fromSavepoint'), component: 'Switch', componentProps: { checkedChildren: 'ON', unCheckedChildren: 'OFF', }, defaultValue: true, - afterItem: () => - h( - 'span', - { class: 'conf-switch' }, - 'restore the application from savepoint or latest checkpoint', - ), + afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.savepointTip')), }, { field: 'startSavePoint', - label: 'savepoint', + label: 'Savepoint', component: receiveData.historySavePoint && receiveData.historySavePoint.length > 0 ? 'Select' : 'Input', - afterItem: () => - h( - 'span', - { class: 'conf-switch' }, - 'restore the application from savepoint or latest checkpoint', - ), + afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.savepointInput')), slot: 'savepoint', ifShow: ({ values }) => values.startSavePointed, required: true, }, { field: 'allowNonRestoredState', - label: 'ignore restored', + label: t('flink.app.view.ignoreRestored'), component: 'Switch', componentProps: { checkedChildren: 'ON', unCheckedChildren: 'OFF', }, - afterItem: () => - h('span', { class: 'conf-switch' }, 'ignore savepoint then cannot be restored'), + afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.ignoreRestoredTip')), defaultValue: false, ifShow: ({ values }) => values.startSavePointed, }, diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala index 4dac87f53e..c8431a7dd9 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala @@ -19,6 +19,7 @@ package org.apache.streampark.flink.kubernetes.watcher import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig} +import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode import org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent import org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV, TrackId} @@ -81,25 +82,27 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default ) // retrieve flink metrics in thread pool val futures: Set[Future[Option[FlinkMetricCV]]] = - trackIds.map( - id => { - val future = Future(collectMetrics(id)) - future.onComplete(_.getOrElse(None) match { - case Some(metric) => - val clusterKey = id.toClusterKey - // update current flink cluster metrics on cache - watchController.flinkMetrics.put(clusterKey, metric) - val isMetricChanged = { - val preMetric = watchController.flinkMetrics.get(clusterKey) - preMetric == null || !preMetric.equalsPayload(metric) - } - if (isMetricChanged) { - eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric)) - } - case _ => + trackIds + .filter(_.executeMode == FlinkK8sExecuteMode.SESSION) + .map( + id => { + val future = Future(collectMetrics(id)) + future.onComplete(_.getOrElse(None) match { + case Some(metric) => + val clusterKey = id.toClusterKey + // update current flink cluster metrics on cache + watchController.flinkMetrics.put(clusterKey, metric) + val isMetricChanged = { + val preMetric = watchController.flinkMetrics.get(clusterKey) + preMetric == null || !preMetric.equalsPayload(metric) + } + if (isMetricChanged) { + eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric)) + } + case _ => + }) + future }) - future - }) // blocking until all future are completed or timeout is reached Try(Await.ready(Future.sequence(futures), conf.requestTimeoutSec seconds)).failed.map { _ => From f73a86c905bd1be16f2c490c4fb5aa25ff6dce96 Mon Sep 17 00:00:00 2001 From: benjobs Date: Sun, 28 Jan 2024 12:25:43 +0800 Subject: [PATCH 2/2] [Improve] doris-connector typo improvement --- .../flink/connector/doris/internal/DorisSinkWriter.java | 2 +- .../apache/streampark/connector/doris/conf/DorisConfig.scala | 2 +- .../connector/doris/conf/DorisSinkConfigOption.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java index a6f0d5317d..160b320478 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java @@ -166,7 +166,7 @@ public final synchronized void writeRecords(String database, String table, Strin final String bufferKey = String.format("%s.%s", database, table); final DorisSinkBufferEntry bufferEntity = bufferMap.computeIfAbsent( - bufferKey, k -> new DorisSinkBufferEntry(database, table, dorisConfig.lablePrefix())); + bufferKey, k -> new DorisSinkBufferEntry(database, table, dorisConfig.labelPrefix())); for (String record : records) { byte[] bts = record.getBytes(StandardCharsets.UTF_8); bufferEntity.addToBuffer(bts); diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala index a38f2cee36..f031282a68 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala @@ -58,7 +58,7 @@ class DorisConfig(parameters: Properties) { val sinkOfferTimeout: Long = sinkOption.sinkOfferTimeout.get() - val lablePrefix: String = sinkOption.lablePrefix.get() + val labelPrefix: String = sinkOption.labelPrefix.get() val semantic: String = sinkOption.semantic.get() diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala index a4940df84a..6ec700b321 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala @@ -126,8 +126,8 @@ class DorisSinkConfigOption(prefixStr: String, properties: Properties) extends S val maxRetries: ConfigOption[Int] = ConfigOption(key = "maxRetries", required = false, defaultValue = 1, classType = classOf[Int]) - val lablePrefix: ConfigOption[String] = ConfigOption( - key = "lablePrefix", + val labelPrefix: ConfigOption[String] = ConfigOption( + key = "labelPrefix", required = false, defaultValue = "doris", classType = classOf[String])