diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Constants.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Constants.java index 886c1f70e..22c018341 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Constants.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Constants.java @@ -215,6 +215,7 @@ private Constants() { public static final String COMMON_TASK_TYPE = "common"; public static final String DEFAULT = "default"; + public static final String ENCRYPTION_TYPE_NONE = "none"; public static final String PASSWORD = "password"; public static final String XXXXXX = "******"; public static final String NULL = "NULL"; @@ -658,4 +659,5 @@ private Constants() { public static final String AUTHENTICATION_PROVIDER_LDAP = "LDAP"; public static final String AUTHENTICATION_PROVIDER_DB = "DB"; + public static final String ENCRYPTION_IDENTIFIER_KEY = "shade.identifier"; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/EncryptionConfig.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/EncryptionConfig.java new file mode 100644 index 000000000..4e887321a --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/EncryptionConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.seatunnel.app.common.Constants.ENCRYPTION_TYPE_NONE; + +@Data +@Configuration +@ConfigurationProperties(prefix = "seatunnel-web.datasource.encryption") +public class EncryptionConfig { + private String type = ENCRYPTION_TYPE_NONE; + private Set keysToEncrypt = new HashSet<>(); +} diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java index 12b1a4f1c..ce9cbc655 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.seatunnel.app.service.ITableSchemaService; import org.apache.seatunnel.app.thirdparty.datasource.DataSourceClientFactory; import org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper; +import org.apache.seatunnel.app.utils.ConfigShadeUtil; import org.apache.seatunnel.app.utils.ServletUtils; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; @@ -94,6 +95,8 @@ public class DatasourceServiceImpl extends SeatunnelBaseServiceImpl protected static final String DEFAULT_DATASOURCE_PLUGIN_VERSION = "1.0.0"; + @Autowired private ConfigShadeUtil configShadeUtil; + @Override public String createDatasource( String datasourceName, @@ -114,6 +117,7 @@ public String createDatasource( throw new SeatunnelException( SeatunnelErrorEnum.DATASOURCE_PRAM_NOT_ALLOWED_NULL, "datasourceConfig"); } + configShadeUtil.encryptData(datasourceConfig); String datasourceConfigStr = JsonUtils.toJsonString(datasourceConfig); Datasource datasource = Datasource.builder() @@ -171,6 +175,7 @@ public boolean updateDatasource( datasource.setUpdateTime(new Date()); datasource.setDescription(description); if (MapUtils.isNotEmpty(datasourceConfig)) { + configShadeUtil.encryptData(datasourceConfig); String configJson = JsonUtils.toJsonString(datasourceConfig); datasource.setDatasourceConfig(configJson); } @@ -226,6 +231,7 @@ public boolean testDatasourceConnectionAble(Long datasourceId) { String configJson = datasource.getDatasourceConfig(); Map datasourceConfig = JsonUtils.toMap(configJson, String.class, String.class); + configShadeUtil.decryptData(datasourceConfig); String pluginName = datasource.getPluginName(); return DataSourceClientFactory.getDataSourceClient() .checkDataSourceConnectivity(pluginName, datasourceConfig); @@ -274,6 +280,7 @@ public List queryDatabaseByDatasourceName(String datasourceName) { Map datasourceConfig = JsonUtils.toMap(config, String.class, String.class); + configShadeUtil.decryptData(datasourceConfig); return DataSourceClientFactory.getDataSourceClient() .getDatabases(pluginName, datasourceConfig); } @@ -305,6 +312,7 @@ public List queryTableNames( options.put("filterName", filterName); String pluginName = datasource.getPluginName(); if (BooleanUtils.isNotTrue(checkIsSupportVirtualTable(pluginName))) { + configShadeUtil.decryptData(datasourceConfig); return DataSourceClientFactory.getDataSourceClient() .getTables(pluginName, databaseName, datasourceConfig, options); } @@ -324,6 +332,7 @@ public List queryTableNames(String datasourceName, String databaseName) Map options = new HashMap<>(); String pluginName = datasource.getPluginName(); if (BooleanUtils.isNotTrue(checkIsSupportVirtualTable(pluginName))) { + configShadeUtil.decryptData(datasourceConfig); return DataSourceClientFactory.getDataSourceClient() .getTables(pluginName, databaseName, datasourceConfig, options); } @@ -345,6 +354,7 @@ public List queryTableSchema( ITableSchemaService tableSchemaService = (ITableSchemaService) applicationContext.getBean("tableSchemaServiceImpl"); if (BooleanUtils.isNotTrue(checkIsSupportVirtualTable(pluginName))) { + configShadeUtil.decryptData(datasourceConfig); List tableFields = DataSourceClientFactory.getDataSourceClient() .getTableFields(pluginName, datasourceConfig, databaseName, tableName); @@ -434,6 +444,7 @@ public PageInfo queryDatasourceList( datasource.getDatasourceConfig(), String.class, String.class); + configShadeUtil.decryptData(datasourceConfig); datasourceRes.setDatasourceConfig(datasourceConfig); datasourceRes.setCreateUserId(datasource.getCreateUserId()); datasourceRes.setUpdateUserId(datasource.getUpdateUserId()); @@ -503,7 +514,10 @@ public Map queryDatasourceConfigById(String datasourceId) { throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId); } String configJson = datasource.getDatasourceConfig(); - return JsonUtils.toMap(configJson, String.class, String.class); + Map datasourceConfig = + JsonUtils.toMap(configJson, String.class, String.class); + configShadeUtil.decryptData(datasourceConfig); + return datasourceConfig; } @Override @@ -591,7 +605,7 @@ public DatasourceDetailRes queryDatasourceDetailByDatasourceName(String datasour return getDatasourceDetailRes(datasource); } - private static DatasourceDetailRes getDatasourceDetailRes(Datasource datasource) { + private DatasourceDetailRes getDatasourceDetailRes(Datasource datasource) { DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes(); datasourceDetailRes.setId(datasource.getId().toString()); datasourceDetailRes.setDatasourceName(datasource.getDatasourceName()); @@ -603,6 +617,7 @@ private static DatasourceDetailRes getDatasourceDetailRes(Datasource datasource) Map datasourceConfig = JsonUtils.toMap(datasource.getDatasourceConfig(), String.class, String.class); + configShadeUtil.decryptData(datasourceConfig); // convert option rule datasourceDetailRes.setDatasourceConfig(datasourceConfig); return datasourceDetailRes; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index e7883395d..cd453048c 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -31,6 +31,7 @@ import org.apache.seatunnel.api.env.ParsingMode; import org.apache.seatunnel.app.bean.connector.ConnectorCache; import org.apache.seatunnel.app.config.ConnectorDataSourceMapperConfig; +import org.apache.seatunnel.app.config.EncryptionConfig; import org.apache.seatunnel.app.dal.dao.IJobDefinitionDao; import org.apache.seatunnel.app.dal.dao.IJobInstanceDao; import org.apache.seatunnel.app.dal.dao.IJobLineDao; @@ -60,6 +61,7 @@ import org.apache.seatunnel.app.service.IVirtualTableService; import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcherUtils; import org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcherUtils; +import org.apache.seatunnel.app.utils.ConfigShadeUtil; import org.apache.seatunnel.app.utils.JobUtils; import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil; import org.apache.seatunnel.app.utils.ServletUtils; @@ -74,6 +76,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonProcessingException; @@ -95,6 +98,8 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.seatunnel.app.common.Constants.ENCRYPTION_IDENTIFIER_KEY; +import static org.apache.seatunnel.app.common.Constants.ENCRYPTION_TYPE_NONE; import static org.apache.seatunnel.app.utils.TaskOptionUtils.getTransformOption; @Service @@ -124,6 +129,10 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl @Resource private IJobMetricsService jobMetricsService; + @Autowired private ConfigShadeUtil configShadeUtil; + + @Autowired private EncryptionConfig encryptionConfig; + @Override public JobExecutorRes createExecuteResource( @NonNull Long jobDefineId, JobExecParam executeParam) { @@ -324,6 +333,14 @@ public String generateJobConfig( if (sinkMap.size() > 0) { sinks = getConnectorConfig(sinkMap); } + + if (!encryptionConfig.getType().equals(ENCRYPTION_TYPE_NONE)) { + envConfig = + envConfig.withValue( + ENCRYPTION_IDENTIFIER_KEY, + ConfigValueFactory.fromAnyRef(encryptionConfig.getType())); + } + String env = envConfig .root() @@ -575,6 +592,7 @@ private Config parseConfigWithOptionRule( String connectorType, Map config, OptionRule optionRule) { + configShadeUtil.encryptData(config); return parseConfigWithOptionRule( pluginType, connectorType, ConfigFactory.parseMap(config), optionRule); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/ConfigShadeUtil.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/ConfigShadeUtil.java new file mode 100644 index 000000000..ae2db7724 --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/ConfigShadeUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.utils; + +import org.apache.seatunnel.app.config.EncryptionConfig; +import org.apache.seatunnel.core.starter.utils.ConfigShadeUtils; +import org.apache.seatunnel.server.common.SeatunnelErrorEnum; +import org.apache.seatunnel.server.common.SeatunnelException; + +import org.apache.commons.lang3.StringUtils; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +import static org.apache.seatunnel.app.common.Constants.ENCRYPTION_TYPE_NONE; + +@Slf4j +@Component +public class ConfigShadeUtil { + + @Autowired private EncryptionConfig encryptionConfig; + + public void encryptData(Map datasourceConfig) { + if (encryptionConfig.getType().equals(ENCRYPTION_TYPE_NONE)) { + return; + } + for (String key : encryptionConfig.getKeysToEncrypt()) { + String value = datasourceConfig.get(key); + if (StringUtils.isNotEmpty(value)) { + try { + String processedValue = + ConfigShadeUtils.encryptOption(encryptionConfig.getType(), value); + datasourceConfig.replace(key, processedValue); + } catch (IllegalArgumentException ex) { + log.error("encryption for key {} failed", key); + throw new SeatunnelException( + SeatunnelErrorEnum.ERROR_CONFIG, + String.format( + "encryption failed for key: %s, check if the keys were persisted in expected format", + key), + ex); + } + } + } + } + + public void decryptData(Map datasourceConfig) { + if (encryptionConfig.getType().equals(ENCRYPTION_TYPE_NONE)) { + return; + } + for (String key : encryptionConfig.getKeysToEncrypt()) { + String value = datasourceConfig.get(key); + if (StringUtils.isNotEmpty(value)) { + try { + String processedValue = + ConfigShadeUtils.decryptOption(encryptionConfig.getType(), value); + datasourceConfig.replace(key, processedValue); + } catch (IllegalArgumentException ex) { + log.error("decryption for key {} failed", key); + throw new SeatunnelException( + SeatunnelErrorEnum.ERROR_CONFIG, + String.format( + "decryption failed for key: %s, check if the keys were persisted in expected format", + key), + ex); + } + } + } + } +} diff --git a/seatunnel-server/seatunnel-app/src/main/resources/application.yml b/seatunnel-server/seatunnel-app/src/main/resources/application.yml index 5a6e3a069..a90a6366a 100644 --- a/seatunnel-server/seatunnel-app/src/main/resources/application.yml +++ b/seatunnel-server/seatunnel-app/src/main/resources/application.yml @@ -52,7 +52,13 @@ jwt: secretKey: algorithm: HS256 - +seatunnel-web: + datasource: + encryption: + type: none + keys-to-encrypt: + - password + - auth --- spring: config: diff --git a/seatunnel-web-it/src/test/resources/application.yml b/seatunnel-web-it/src/test/resources/application.yml index ef285655c..752fbc7b7 100644 --- a/seatunnel-web-it/src/test/resources/application.yml +++ b/seatunnel-web-it/src/test/resources/application.yml @@ -48,6 +48,14 @@ jwt: secretKey: https://github.com/apache/seatunnel algorithm: HS256 +seatunnel-web: + datasource: + encryption: + type: none + keys-to-encrypt: + - password + - auth + --- spring: application: