Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.app.service.ITableSchemaService;
import org.apache.seatunnel.app.thirdparty.datasource.DataSourceClientFactory;
import org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper;
import org.apache.seatunnel.app.utils.ConfigShadeUtil;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
Expand All @@ -53,6 +54,7 @@
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -93,6 +95,9 @@ public class DatasourceServiceImpl extends SeatunnelBaseServiceImpl

protected static final String DEFAULT_DATASOURCE_PLUGIN_VERSION = "1.0.0";

@Value("${datasource.encryption.type:default}")
private String datasourceEncryptionType;

@Override
public String createDatasource(
Integer userId,
Expand All @@ -113,6 +118,7 @@ public String createDatasource(
throw new SeatunnelException(
SeatunnelErrorEnum.DATASOURCE_PRAM_NOT_ALLOWED_NULL, "datasourceConfig");
}
ConfigShadeUtil.encryptData(datasourceConfig, datasourceEncryptionType);
String datasourceConfigStr = JsonUtils.toJsonString(datasourceConfig);
Datasource datasource =
Datasource.builder()
Expand Down Expand Up @@ -171,6 +177,7 @@ public boolean updateDatasource(
datasource.setUpdateTime(new Date());
datasource.setDescription(description);
if (MapUtils.isNotEmpty(datasourceConfig)) {
ConfigShadeUtil.encryptData(datasourceConfig, datasourceEncryptionType);
String configJson = JsonUtils.toJsonString(datasourceConfig);
datasource.setDatasourceConfig(configJson);
}
Expand Down Expand Up @@ -208,6 +215,7 @@ public boolean testDatasourceConnectionAble(
String pluginVersion,
Map<String, String> datasourceConfig) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_TEST_CONNECT, userId);
ConfigShadeUtil.decryptData(datasourceConfig, datasourceEncryptionType);
return DataSourceClientFactory.getDataSourceClient()
.checkDataSourceConnectivity(pluginName, datasourceConfig);
}
Expand All @@ -227,6 +235,7 @@ public boolean testDatasourceConnectionAble(Integer userId, Long datasourceId) {
String configJson = datasource.getDatasourceConfig();
Map<String, String> datasourceConfig =
JsonUtils.toMap(configJson, String.class, String.class);
ConfigShadeUtil.decryptData(datasourceConfig, datasourceEncryptionType);
String pluginName = datasource.getPluginName();
return DataSourceClientFactory.getDataSourceClient()
.checkDataSourceConnectivity(pluginName, datasourceConfig);
Expand Down Expand Up @@ -276,6 +285,7 @@ public List<String> queryDatabaseByDatasourceName(String datasourceName) {
Map<String, String> datasourceConfig =
JsonUtils.toMap(config, String.class, String.class);

ConfigShadeUtil.decryptData(datasourceConfig, datasourceEncryptionType);
return DataSourceClientFactory.getDataSourceClient()
.getDatabases(pluginName, datasourceConfig);
}
Expand Down Expand Up @@ -435,6 +445,7 @@ public PageInfo<DatasourceRes> queryDatasourceList(
datasource.getDatasourceConfig(),
String.class,
String.class);
ConfigShadeUtil.decryptData(datasourceConfig, datasourceEncryptionType);
datasourceRes.setDatasourceConfig(datasourceConfig);
datasourceRes.setCreateUserId(datasource.getCreateUserId());
datasourceRes.setUpdateUserId(datasource.getUpdateUserId());
Expand Down Expand Up @@ -504,7 +515,10 @@ public Map<String, String> queryDatasourceConfigById(String datasourceId) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId);
}
String configJson = datasource.getDatasourceConfig();
return JsonUtils.toMap(configJson, String.class, String.class);
Map<String, String> datasourceConfig =
JsonUtils.toMap(configJson, String.class, String.class);
ConfigShadeUtil.decryptData(datasourceConfig, datasourceEncryptionType);
return datasourceConfig;
}

@Override
Expand Down Expand Up @@ -604,6 +618,7 @@ private static DatasourceDetailRes getDatasourceDetailRes(Datasource datasource)

Map<String, String> datasourceConfig =
JsonUtils.toMap(datasource.getDatasourceConfig(), String.class, String.class);
ConfigShadeUtil.decryptData(datasourceConfig, datasourceEncryptionType);
// convert option rule
datasourceDetailRes.setDatasourceConfig(datasourceConfig);
return datasourceDetailRes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.app.service.impl;

import org.apache.seatunnel.app.utils.ConfigShadeUtil;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -123,6 +125,9 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl

@Resource private IJobMetricsService jobMetricsService;

@Value("${datasource.encryption.type:default}")
private String datasourceEncryptionType;

@Override
public JobExecutorRes createExecuteResource(
@NonNull Integer userId, @NonNull Long jobDefineId, JobExecParam executeParam) {
Expand Down Expand Up @@ -326,6 +331,7 @@ public String generateJobConfig(
.setJson(false)
.setComments(false)
.setOriginComments(false));
env = env + "\"shade.identifier\"=" + datasourceEncryptionType + "\n";
String jobConfig = SeaTunnelConfigUtil.generateConfig(env, sources, transforms, sinks);
return JobUtils.replaceJobConfigPlaceholders(jobConfig, executeParam);
}
Expand Down Expand Up @@ -571,6 +577,7 @@ private Config parseConfigWithOptionRule(
String connectorType,
Map<String, String> config,
OptionRule optionRule) {
ConfigShadeUtil.encryptData(config, datasourceEncryptionType);
return parseConfigWithOptionRule(
pluginType, connectorType, ConfigFactory.parseMap(config), optionRule);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.seatunnel.app.utils;


import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.app.common.Constants;
import org.apache.seatunnel.core.starter.utils.ConfigShadeUtils;

import java.util.Map;

@Slf4j
public class ConfigShadeUtil {

public static void encryptData(Map<String, String> datasourceConfig, String datasourceEncryptionType) {
String password = datasourceConfig.get(Constants.PASSWORD);
if(!password.isEmpty()) {
try {
datasourceConfig.replace(
Constants.PASSWORD,
ConfigShadeUtils.encryptOption(
datasourceEncryptionType, password));
} catch (IllegalArgumentException ex) {
log.warn("encrypt password failed");
}
}
}

public static void decryptData(Map<String, String> datasourceConfig, String datasourceEncryptionType) {
String password = datasourceConfig.get(Constants.PASSWORD);
if(!password.isEmpty()) {
try {
datasourceConfig.replace(
Constants.PASSWORD,
ConfigShadeUtils.decryptOption(
datasourceEncryptionType, password));
} catch (IllegalArgumentException ex) {
log.warn("decrypt password failed as password is not encrypted");
}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ jwt:
secretKey:
algorithm: HS256

datasource:
encryption:
type: base64

---
spring:
config:
Expand Down