Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve]doris-connector typo improvement #3516

Merged
merged 2 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public Map<String, Serializable> dashboard(Long teamId) {
if (!teamId.equals(app.getTeamId())) {
continue;
}
// 1) only yarn-application, yarn-perjob mode
if (app.getJmMemory() != null) {
totalJmMemory += app.getJmMemory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ public void subscribeJobStatusChange(FlinkJobStatusChangeEvent event) {
TrackId trackId = event.trackId();
// get pre application record
Application app = applicationService.getById(trackId.appId());
if (app == null) {
if (app == null || FlinkAppState.isEndState(app.getState())) {
return;
}

// update application record
setByJobStatusCV(app, jobStatus);
applicationService.persistMetrics(app);

// email alerts when necessary
FlinkAppState state = FlinkAppState.of(app.getState());
// email alerts when necessary
if (FlinkAppState.FAILED.equals(state)
|| FlinkAppState.LOST.equals(state)
|| FlinkAppState.RESTARTING.equals(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ export default {
start: 'Start Application',
stop: 'Stop application',
savepoint: 'Trigger Savepoint',
fromSavepoint: 'From savepoint',
savepointTip: 'Restore the job from savepoint or latest checkpoint',
savepointInput:
'Select or manually specify the savepoint/checkpoint path, Same as:-allowNonRestoredState(-n)',
ignoreRestored: 'Ignore failed',
ignoreRestoredTip: 'ignore savepoint then cannot be restored,',
recheck: 'the associated project has changed and this job need to be rechecked',
changed: 'the application has changed.',
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ export default {
savepoint: '触发 Savepoint',
recheck: '关联的项目已更改,需要重新检查此作业',
changed: '应用程序已更改。',
fromSavepoint: 'Savepoint 恢复',
savepointTip: '作业从 savepoint 或 checkpoint 恢复状态',
savepointInput: '选择或者手动指定 savepoint/checkpoint 路径',
ignoreRestored: '跳过恢复失败',
ignoreRestoredTip: '当状态恢复失败时跳过错误,作业继续运行, 同参数:-allowNonRestoredState(-n)',
},
pod: {
choice: '选择',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@
/* delete configuration */
async function handleDeleteConf(record: Recordable) {
await fetchRemoveConf({ id: record.id });
reloadConf();
await reloadConf();
}

/* delete flink sql */
async function handleDeleteFlinkSql(record: Recordable) {
await fetchRemoveFlinkSql({ id: record.id });
reloadFlinkSql();
await reloadFlinkSql();
}

function handleCompare(record: Recordable) {
Expand Down Expand Up @@ -345,17 +345,17 @@
/* delete savePoint */
async function handleDeleteSavePoint(record: Recordable) {
await fetchRemoveSavePoint({ id: record.id });
reloadSavePoint();
await reloadSavePoint();
}

async function handleDeleteBackup(record: Recordable) {
await fetchRemoveBackup(record.id);
reloadBackup();
await reloadBackup();
}

async function handleDeleteOperationLog(record: Recordable) {
await fetchDeleteOperationLog(record.id);
reloadOperationLog();
await reloadOperationLog();
}

/* copy path */
Expand All @@ -377,16 +377,51 @@
</script>
<template>
<div>
<Tabs :defaultActiveKey="1" class="mt-15px" :animated="false" :tab-bar-gutter="0">
<TabPane key="1" tab="Option" force-render>
<Tabs :defaultActiveKey="1" class="mt-15px" :tab-bar-gutter="0">
<TabPane key="1" :tab="t('flink.app.detail.detailTab.detailTabName.operationLog')">
<BasicTable @register="registerLogsTable">
<template #bodyCell="{ column, record }">
<template v-if="column.dataIndex === 'optionName'">
<Tag color="blue" v-if="record.optionName === OperationEnum.RELEASE"> Release </Tag>
<Tag color="green" v-if="record.optionName === OperationEnum.START"> Start </Tag>
<Tag color="cyan" v-if="record.optionName === OperationEnum.SAVEPOINT">
Savepoint
</Tag>
<Tag color="orange" v-if="record.optionName === OperationEnum.CANCEL"> Cancel </Tag>
</template>
<template v-if="column.dataIndex === 'yarnAppId'">
<a type="link" @click="handleYarnUrl(record.yarnAppId)" target="_blank">
{{ record.yarnAppId }}
</a>
</template>
<template v-if="column.dataIndex === 'jobManagerUrl'">
<a type="link" :href="record.jobManagerUrl" target="_blank">
{{ record.jobManagerUrl }}
</a>
</template>
<template v-if="column.dataIndex === 'optionTime'">
<Icon icon="ant-design:clock-circle-outlined" />
{{ record.optionTime }}
</template>
<template v-if="column.dataIndex === 'success'">
<Tag class="bold-tag" color="#52c41a" v-if="record.success"> SUCCESS </Tag>
<Tag class="bold-tag" color="#f5222d" v-else> FAILED </Tag>
</template>
<template v-if="column.dataIndex === 'operation'">
<TableAction :actions="getOperationLogAction(record)" />
</template>
</template>
</BasicTable>
</TabPane>
<TabPane key="2" :tab="t('flink.app.detail.detailTab.detailTabName.option')">
<Descriptions bordered size="middle" layout="vertical">
<DescriptionItem v-for="(v, k) in JSON.parse(app.options || '{}')" :key="k" :label="k">
{{ v }}
</DescriptionItem>
</Descriptions>
</TabPane>
<TabPane
key="2"
key="3"
:tab="t('flink.app.detail.detailTab.detailTabName.configuration')"
v-if="app && app.appType === AppTypeEnum.STREAMPARK_FLINK && tabConf.showConf"
>
Expand Down Expand Up @@ -424,7 +459,7 @@
</BasicTable>
</TabPane>
<TabPane
key="3"
key="4"
:tab="t('flink.app.detail.detailTab.detailTabName.flinkSql')"
v-if="app.jobType === JobTypeEnum.SQL"
>
Expand Down Expand Up @@ -472,7 +507,7 @@
</BasicTable>
</TabPane>
<TabPane
key="4"
key="5"
:tab="t('flink.app.detail.detailTab.detailTabName.savepoint')"
v-if="tabConf.showSaveOption"
>
Expand Down Expand Up @@ -500,7 +535,7 @@
</BasicTable>
</TabPane>
<TabPane
key="5"
key="6"
:tab="t('flink.app.detail.detailTab.detailTabName.backup')"
v-if="tabConf.showBackup"
>
Expand All @@ -517,45 +552,6 @@
</template>
</BasicTable>
</TabPane>
<TabPane
key="6"
:tab="t('flink.app.detail.detailTab.detailTabName.operationLog')"
v-if="tabConf.showOptionLog"
>
<BasicTable @register="registerLogsTable">
<template #bodyCell="{ column, record }">
<template v-if="column.dataIndex === 'optionName'">
<Tag color="blue" v-if="record.optionName === OperationEnum.RELEASE"> Release </Tag>
<Tag color="green" v-if="record.optionName === OperationEnum.START"> Start </Tag>
<Tag color="cyan" v-if="record.optionName === OperationEnum.SAVEPOINT">
Savepoint
</Tag>
<Tag color="orange" v-if="record.optionName === OperationEnum.CANCEL"> Cancel </Tag>
</template>
<template v-if="column.dataIndex === 'yarnAppId'">
<a type="link" @click="handleYarnUrl(record.yarnAppId)" target="_blank">
{{ record.yarnAppId }}
</a>
</template>
<template v-if="column.dataIndex === 'jobManagerUrl'">
<a type="link" :href="record.jobManagerUrl" target="_blank">
{{ record.jobManagerUrl }}
</a>
</template>
<template v-if="column.dataIndex === 'optionTime'">
<Icon icon="ant-design:clock-circle-outlined" />
{{ record.optionTime }}
</template>
<template v-if="column.dataIndex === 'success'">
<Tag class="bold-tag" color="#52c41a" v-if="record.success"> SUCCESS </Tag>
<Tag class="bold-tag" color="#f5222d" v-else> FAILED </Tag>
</template>
<template v-if="column.dataIndex === 'operation'">
<TableAction :actions="getOperationLogAction(record)" />
</template>
</template>
</BasicTable>
</TabPane>
</Tabs>

<CompareModal @register="registerCompare" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,47 +59,36 @@
schemas: [
{
field: 'startSavePointed',
label: 'from savepoint',
label: t('flink.app.view.fromSavepoint'),
component: 'Switch',
componentProps: {
checkedChildren: 'ON',
unCheckedChildren: 'OFF',
},
defaultValue: true,
afterItem: () =>
h(
'span',
{ class: 'conf-switch' },
'restore the application from savepoint or latest checkpoint',
),
afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.savepointTip')),
},
{
field: 'startSavePoint',
label: 'savepoint',
label: 'Savepoint',
component:
receiveData.historySavePoint && receiveData.historySavePoint.length > 0
? 'Select'
: 'Input',
afterItem: () =>
h(
'span',
{ class: 'conf-switch' },
'restore the application from savepoint or latest checkpoint',
),
afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.savepointInput')),
slot: 'savepoint',
ifShow: ({ values }) => values.startSavePointed,
required: true,
},
{
field: 'allowNonRestoredState',
label: 'ignore restored',
label: t('flink.app.view.ignoreRestored'),
component: 'Switch',
componentProps: {
checkedChildren: 'ON',
unCheckedChildren: 'OFF',
},
afterItem: () =>
h('span', { class: 'conf-switch' }, 'ignore savepoint then cannot be restored'),
afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.ignoreRestoredTip')),
defaultValue: false,
ifShow: ({ values }) => values.startSavePointed,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public final synchronized void writeRecords(String database, String table, Strin
final String bufferKey = String.format("%s.%s", database, table);
final DorisSinkBufferEntry bufferEntity =
bufferMap.computeIfAbsent(
bufferKey, k -> new DorisSinkBufferEntry(database, table, dorisConfig.lablePrefix()));
bufferKey, k -> new DorisSinkBufferEntry(database, table, dorisConfig.labelPrefix()));
for (String record : records) {
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
bufferEntity.addToBuffer(bts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DorisConfig(parameters: Properties) {

val sinkOfferTimeout: Long = sinkOption.sinkOfferTimeout.get()

val lablePrefix: String = sinkOption.lablePrefix.get()
val labelPrefix: String = sinkOption.labelPrefix.get()

val semantic: String = sinkOption.semantic.get()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class DorisSinkConfigOption(prefixStr: String, properties: Properties) extends S
val maxRetries: ConfigOption[Int] =
ConfigOption(key = "maxRetries", required = false, defaultValue = 1, classType = classOf[Int])

val lablePrefix: ConfigOption[String] = ConfigOption(
key = "lablePrefix",
val labelPrefix: ConfigOption[String] = ConfigOption(
key = "labelPrefix",
required = false,
defaultValue = "doris",
classType = classOf[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.streampark.flink.kubernetes.watcher

import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
import org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV, TrackId}

Expand Down Expand Up @@ -81,25 +82,27 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default
)
// retrieve flink metrics in thread pool
val futures: Set[Future[Option[FlinkMetricCV]]] =
trackIds.map(
id => {
val future = Future(collectMetrics(id))
future.onComplete(_.getOrElse(None) match {
case Some(metric) =>
val clusterKey = id.toClusterKey
// update current flink cluster metrics on cache
watchController.flinkMetrics.put(clusterKey, metric)
val isMetricChanged = {
val preMetric = watchController.flinkMetrics.get(clusterKey)
preMetric == null || !preMetric.equalsPayload(metric)
}
if (isMetricChanged) {
eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
}
case _ =>
trackIds
.filter(_.executeMode == FlinkK8sExecuteMode.SESSION)
.map(
id => {
val future = Future(collectMetrics(id))
future.onComplete(_.getOrElse(None) match {
case Some(metric) =>
val clusterKey = id.toClusterKey
// update current flink cluster metrics on cache
watchController.flinkMetrics.put(clusterKey, metric)
val isMetricChanged = {
val preMetric = watchController.flinkMetrics.get(clusterKey)
preMetric == null || !preMetric.equalsPayload(metric)
}
if (isMetricChanged) {
eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
}
case _ =>
})
future
})
future
})
// blocking until all future are completed or timeout is reached
Try(Await.ready(Future.sequence(futures), conf.requestTimeoutSec seconds)).failed.map {
_ =>
Expand Down
Loading