Skip to content

Commit

Permalink
[Feature] Add Support for encrypting config fields
Browse files Browse the repository at this point in the history
While writing intermediate config to submit job via seatunnel client,
we are required to encrypt certain fields like password, auth, username

By Default no encryption would be done, to enable the encryption
provide the following properties

```
seatunnel-web:
  datasource:
    encryption:
      type: your_encryption
      keys-to-encrypt:
        - password
        - auth
```

ref: https://seatunnel.apache.org/docs/2.3.9/connector-v2/Config-Encryption-Decryption/

Co-authored-by: BilwaST <[email protected]>
Co-authored-by: Shashwat Tiwari <[email protected]>
  • Loading branch information
shashwatsai and BilwaST committed Feb 10, 2025
1 parent 20ac01b commit a30d3d1
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.app.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import lombok.Data;

import java.util.HashSet;
import java.util.Set;

import static org.apache.seatunnel.app.common.Constants.DEFAULT;

@Data
@Configuration
@ConfigurationProperties(prefix = "seatunnel-web.datasource.encryption")
public class SeatunnelEncryptionConfig {
private String type = DEFAULT;
private Set<String> keysToEncrypt = new HashSet<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.app.service.ITableSchemaService;
import org.apache.seatunnel.app.thirdparty.datasource.DataSourceClientFactory;
import org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper;
import org.apache.seatunnel.app.utils.ConfigShadeUtil;
import org.apache.seatunnel.app.utils.ServletUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class DatasourceServiceImpl extends SeatunnelBaseServiceImpl

protected static final String DEFAULT_DATASOURCE_PLUGIN_VERSION = "1.0.0";

@Autowired private ConfigShadeUtil configShadeUtil;

@Override
public String createDatasource(
String datasourceName,
Expand All @@ -114,6 +117,7 @@ public String createDatasource(
throw new SeatunnelException(
SeatunnelErrorEnum.DATASOURCE_PRAM_NOT_ALLOWED_NULL, "datasourceConfig");
}
configShadeUtil.encryptData(datasourceConfig);
String datasourceConfigStr = JsonUtils.toJsonString(datasourceConfig);
Datasource datasource =
Datasource.builder()
Expand Down Expand Up @@ -171,6 +175,7 @@ public boolean updateDatasource(
datasource.setUpdateTime(new Date());
datasource.setDescription(description);
if (MapUtils.isNotEmpty(datasourceConfig)) {
configShadeUtil.encryptData(datasourceConfig);
String configJson = JsonUtils.toJsonString(datasourceConfig);
datasource.setDatasourceConfig(configJson);
}
Expand Down Expand Up @@ -226,6 +231,7 @@ public boolean testDatasourceConnectionAble(Long datasourceId) {
String configJson = datasource.getDatasourceConfig();
Map<String, String> datasourceConfig =
JsonUtils.toMap(configJson, String.class, String.class);
configShadeUtil.decryptData(datasourceConfig);
String pluginName = datasource.getPluginName();
return DataSourceClientFactory.getDataSourceClient()
.checkDataSourceConnectivity(pluginName, datasourceConfig);
Expand Down Expand Up @@ -274,6 +280,7 @@ public List<String> queryDatabaseByDatasourceName(String datasourceName) {
Map<String, String> datasourceConfig =
JsonUtils.toMap(config, String.class, String.class);

configShadeUtil.decryptData(datasourceConfig);
return DataSourceClientFactory.getDataSourceClient()
.getDatabases(pluginName, datasourceConfig);
}
Expand Down Expand Up @@ -305,6 +312,7 @@ public List<String> queryTableNames(
options.put("filterName", filterName);
String pluginName = datasource.getPluginName();
if (BooleanUtils.isNotTrue(checkIsSupportVirtualTable(pluginName))) {
configShadeUtil.decryptData(datasourceConfig);
return DataSourceClientFactory.getDataSourceClient()
.getTables(pluginName, databaseName, datasourceConfig, options);
}
Expand All @@ -324,6 +332,7 @@ public List<String> queryTableNames(String datasourceName, String databaseName)
Map<String, String> options = new HashMap<>();
String pluginName = datasource.getPluginName();
if (BooleanUtils.isNotTrue(checkIsSupportVirtualTable(pluginName))) {
configShadeUtil.decryptData(datasourceConfig);
return DataSourceClientFactory.getDataSourceClient()
.getTables(pluginName, databaseName, datasourceConfig, options);
}
Expand All @@ -345,6 +354,7 @@ public List<TableField> queryTableSchema(
ITableSchemaService tableSchemaService =
(ITableSchemaService) applicationContext.getBean("tableSchemaServiceImpl");
if (BooleanUtils.isNotTrue(checkIsSupportVirtualTable(pluginName))) {
configShadeUtil.decryptData(datasourceConfig);
List<TableField> tableFields =
DataSourceClientFactory.getDataSourceClient()
.getTableFields(pluginName, datasourceConfig, databaseName, tableName);
Expand Down Expand Up @@ -434,6 +444,7 @@ public PageInfo<DatasourceRes> queryDatasourceList(
datasource.getDatasourceConfig(),
String.class,
String.class);
configShadeUtil.decryptData(datasourceConfig);
datasourceRes.setDatasourceConfig(datasourceConfig);
datasourceRes.setCreateUserId(datasource.getCreateUserId());
datasourceRes.setUpdateUserId(datasource.getUpdateUserId());
Expand Down Expand Up @@ -503,7 +514,10 @@ public Map<String, String> queryDatasourceConfigById(String datasourceId) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId);
}
String configJson = datasource.getDatasourceConfig();
return JsonUtils.toMap(configJson, String.class, String.class);
Map<String, String> datasourceConfig =
JsonUtils.toMap(configJson, String.class, String.class);
configShadeUtil.decryptData(datasourceConfig);
return datasourceConfig;
}

@Override
Expand Down Expand Up @@ -591,7 +605,7 @@ public DatasourceDetailRes queryDatasourceDetailByDatasourceName(String datasour
return getDatasourceDetailRes(datasource);
}

private static DatasourceDetailRes getDatasourceDetailRes(Datasource datasource) {
private DatasourceDetailRes getDatasourceDetailRes(Datasource datasource) {
DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes();
datasourceDetailRes.setId(datasource.getId().toString());
datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
Expand All @@ -603,6 +617,7 @@ private static DatasourceDetailRes getDatasourceDetailRes(Datasource datasource)

Map<String, String> datasourceConfig =
JsonUtils.toMap(datasource.getDatasourceConfig(), String.class, String.class);
configShadeUtil.decryptData(datasourceConfig);
// convert option rule
datasourceDetailRes.setDatasourceConfig(datasourceConfig);
return datasourceDetailRes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.seatunnel.api.env.ParsingMode;
import org.apache.seatunnel.app.bean.connector.ConnectorCache;
import org.apache.seatunnel.app.config.ConnectorDataSourceMapperConfig;
import org.apache.seatunnel.app.config.SeatunnelEncryptionConfig;
import org.apache.seatunnel.app.dal.dao.IJobDefinitionDao;
import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
import org.apache.seatunnel.app.dal.dao.IJobLineDao;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.apache.seatunnel.app.service.IVirtualTableService;
import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcherUtils;
import org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcherUtils;
import org.apache.seatunnel.app.utils.ConfigShadeUtil;
import org.apache.seatunnel.app.utils.JobUtils;
import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
import org.apache.seatunnel.app.utils.ServletUtils;
Expand All @@ -74,6 +76,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -124,6 +127,10 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl

@Resource private IJobMetricsService jobMetricsService;

@Autowired private ConfigShadeUtil configShadeUtil;

@Autowired private SeatunnelEncryptionConfig seatunnelEncryptionConfig;

@Override
public JobExecutorRes createExecuteResource(
@NonNull Long jobDefineId, JobExecParam executeParam) {
Expand Down Expand Up @@ -324,6 +331,10 @@ public String generateJobConfig(
if (sinkMap.size() > 0) {
sinks = getConnectorConfig(sinkMap);
}
envConfig =
envConfig.withValue(
"shade.identifier",
ConfigValueFactory.fromAnyRef(seatunnelEncryptionConfig.getType()));
String env =
envConfig
.root()
Expand Down Expand Up @@ -575,6 +586,7 @@ private Config parseConfigWithOptionRule(
String connectorType,
Map<String, String> config,
OptionRule optionRule) {
configShadeUtil.encryptData(config);
return parseConfigWithOptionRule(
pluginType, connectorType, ConfigFactory.parseMap(config), optionRule);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.app.utils;

import org.apache.seatunnel.app.config.SeatunnelEncryptionConfig;
import org.apache.seatunnel.core.starter.utils.ConfigShadeUtils;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;

import org.apache.commons.lang3.StringUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

import java.util.Map;

@Slf4j
@Component
public class ConfigShadeUtil {

@Autowired private SeatunnelEncryptionConfig seatunnelEncryptionConfig;

public void encryptData(Map<String, String> datasourceConfig) {
for (String key : seatunnelEncryptionConfig.getKeysToEncrypt()) {
String value = datasourceConfig.get(key);
if (StringUtils.isNotEmpty(value)) {
try {
String processedValue =
ConfigShadeUtils.encryptOption(
seatunnelEncryptionConfig.getType(), value);
datasourceConfig.replace(key, processedValue);
} catch (IllegalArgumentException ex) {
log.error("encryption for key {} failed", key);
throw new SeatunnelException(
SeatunnelErrorEnum.ERROR_CONFIG,
String.format(
"encryption failed for key: %s, check if the keys were persisted in expected format",
key),
ex);
}
}
}
}

public void decryptData(Map<String, String> datasourceConfig) {
for (String key : seatunnelEncryptionConfig.getKeysToEncrypt()) {
String value = datasourceConfig.get(key);
if (StringUtils.isNotEmpty(value)) {
try {
String processedValue =
ConfigShadeUtils.decryptOption(
seatunnelEncryptionConfig.getType(), value);
datasourceConfig.replace(key, processedValue);
} catch (IllegalArgumentException ex) {
log.error("decryption for key {} failed", key);
throw new SeatunnelException(
SeatunnelErrorEnum.ERROR_CONFIG,
String.format(
"decryption failed for key: %s, check if the keys were persisted in expected format",
key),
ex);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ jwt:
secretKey:
algorithm: HS256


seatunnel-web:
datasource:
encryption:
type: default
keys-to-encrypt:
- password
- auth
---
spring:
config:
Expand Down
8 changes: 8 additions & 0 deletions seatunnel-web-it/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ jwt:
secretKey: https://github.com/apache/seatunnel
algorithm: HS256

seatunnel-web:
datasource:
encryption:
type: default
keys-to-encrypt:
- password
- auth

---
spring:
application:
Expand Down

0 comments on commit a30d3d1

Please sign in to comment.