diff --git a/streampark-console/streampark-console-webapp/package.json b/streampark-console/streampark-console-webapp/package.json index 9e3ce9bacd..d7cfeb0fd3 100644 --- a/streampark-console/streampark-console-webapp/package.json +++ b/streampark-console/streampark-console-webapp/package.json @@ -65,7 +65,8 @@ "vue": "^3.3.4", "vue-i18n": "^9.2.2", "vue-router": "^4.2.4", - "vue-types": "^5.1.0" + "vue-types": "^5.1.0", + "js-yaml": "^4.1.0" }, "devDependencies": { "@iconify/json": "^2.2.89", @@ -118,7 +119,8 @@ "vite-plugin-theme": "^0.8.6", "vite-plugin-windicss": "^1.9.1", "vue-eslint-parser": "^9.3.1", - "vue-tsc": "^1.8.4" + "vue-tsc": "^1.8.4", + "js-yaml": "^4.1.0" }, "engines": { "node": ">=16.15.1 <= 18", diff --git a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts index 7c078d5e54..d7e72ff6c8 100644 --- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts +++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts @@ -151,6 +151,7 @@ export enum JobTypeEnum { JAR = 1, SQL = 2, PYFLINK = 3, + CDC = 4, } export enum ConfigTypeEnum { diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts index 79fa2669f5..59a191d42c 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts @@ -205,8 +205,10 @@ export default { editStreamPark: { success: 'update successful', flinkSqlRequired: 'Flink Sql is required', + yamlRequired: 'Yaml is required', appidCheck: 'appid can not be empty', sqlCheck: 'SQL check error', + yamlCheck: 'Yaml check error', }, operation: { edit: 'Edit Job', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts index 11a8b417a7..5ba9592331 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts @@ -195,6 +195,8 @@ export default { flinkSqlRequired: 'Flink Sql 为必填项', appidCheck: 'appid 不能为空', sqlCheck: 'SQL 检查错误', + yamlRequired: 'Yaml 不能为空', + yamlCheck: 'Yaml 检查错误', }, operation: { edit: '编辑作业', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue index 4684f7ca0a..da1910f09f 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue @@ -263,13 +263,15 @@ async function handleAppCreate(formValue: Recordable) { try { submitLoading.value = true; - if (formValue.jobType == JobTypeEnum.SQL) { + if (formValue.jobType == JobTypeEnum.SQL || formValue.jobType == JobTypeEnum.CDC) { if (formValue.flinkSql == null || formValue.flinkSql.trim() === '') { - createMessage.warning(t('flink.app.editStreamPark.flinkSqlRequired')); + const errorMsg = formValue.jobType == JobTypeEnum.SQL ? t('flink.app.editStreamPark.flinkSqlRequired') : t('flink.app.editStreamPark.yamlRequired') + createMessage.warning(errorMsg); } else { const access = await flinkSql?.value?.handleVerifySql(); if (!access) { - createMessage.warning(t('flink.app.editStreamPark.sqlCheck')); + const errorMsg = formValue.jobType == JobTypeEnum.SQL ? t('flink.app.editStreamPark.sqlCheck') : t('flink.app.editStreamPark.yamlCheck') + createMessage.warning(errorMsg); throw new Error(access); } } @@ -316,6 +318,7 @@ v-model:value="model[field]" :versionId="model['versionId']" :suggestions="suggestions" + :jobType="Number(model['jobType'])" @preview="(value) => openReviewDrawer(true, { value, suggestions })" /> diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue index 2cc118ceb6..1180730bfd 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue @@ -164,13 +164,15 @@ async function handleAppUpdate(values) { try { submitLoading.value = true; - if (app.jobType == JobTypeEnum.SQL) { + if (app.jobType == JobTypeEnum.SQL || app.jobType == JobTypeEnum.CDC) { if (values.flinkSql == null || values.flinkSql.trim() === '') { - createMessage.warning(t('flink.app.editStreamPark.flinkSqlRequired')); + const errorMsg = app.jobType == JobTypeEnum.SQL ? t('flink.app.editStreamPark.flinkSqlRequired') : t('flink.app.editStreamPark.yamlRequired') + createMessage.warning(errorMsg); } else { const access = await flinkSql?.value?.handleVerifySql(); if (!access) { - createMessage.warning(t('flink.app.editStreamPark.sqlCheck')); + const errorMsg = app.jobType == JobTypeEnum.SQL ? t('flink.app.editStreamPark.sqlCheck') : t('flink.app.editStreamPark.yamlCheck') + createMessage.warning(errorMsg); throw new Error(access); } handleSubmitSQL(values); @@ -283,7 +285,7 @@ Object.assign(app, res); Object.assign(defaultOptions, JSON.parse(app.options || '{}')); - if (app.jobType == JobTypeEnum.SQL) { + if (app.jobType == JobTypeEnum.SQL || app.jobType == JobTypeEnum.CDC) { fetchFlinkHistory({ id: appId }).then((res) => { flinkSqlHistory.value = res; }); diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue index 99cea85a21..e33a45a828 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue @@ -310,6 +310,7 @@ :options="[ { label: 'JAR', value: JobTypeEnum.JAR }, { label: 'SQL', value: JobTypeEnum.SQL }, + { label: 'FlinkCDC', value: JobTypeEnum.CDC }, ]" /> @@ -349,6 +350,7 @@