diff --git a/client-adapter/README.md b/client-adapter/README.md index 80c62c0b54..3763bb7797 100644 --- a/client-adapter/README.md +++ b/client-adapter/README.md @@ -292,7 +292,7 @@ adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文 ### 4.2 适配器表映射文件 修改 conf/rdb/mytest_user.yml文件: -``` +``` yml dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 destination: example # cannal的instance或者MQ的topic outerAdapterKey: oracle1 # adapter key, 对应上面配置outAdapters中的key @@ -309,7 +309,10 @@ dbMapping: name: role_id: c_time: - test1: + test1: + targetKeywordsIdentifier: # 关键字、保留字标识符 + prefix: '`' # 前缀,默认值为 ` + suffix: '`' # 后缀,默认值为 ` ``` 导入的类型以目标表的元类型为准, 将自动转换 diff --git a/client-adapter/pom.xml b/client-adapter/pom.xml index 4af309199b..26ac8a6434 100644 --- a/client-adapter/pom.xml +++ b/client-adapter/pom.xml @@ -177,7 +177,7 @@ org.postgresql postgresql - 42.1.4 + 42.2.16 com.oracle diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java index 40f7e2f79b..7b4abd08a7 100644 --- a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java @@ -1,11 +1,11 @@ package com.alibaba.otter.canal.client.adapter.rdb.config; -import java.util.LinkedHashMap; -import java.util.Map; +import com.alibaba.otter.canal.client.adapter.support.AdapterConfig; import org.apache.commons.lang.StringUtils; -import com.alibaba.otter.canal.client.adapter.support.AdapterConfig; +import java.util.LinkedHashMap; +import java.util.Map; /** * RDB表映射配置 @@ -93,24 +93,31 @@ public void validate() { public static class DbMapping implements AdapterMapping { - private boolean mirrorDb = false; // 是否镜像库 - private String database; // 数据库名或schema名 - private String table; // 表名 - private Map targetPk = new LinkedHashMap<>(); // 目标表主键字段 - private boolean mapAll = false; // 映射所有字段 - private String targetDb; // 目标库名 - private String targetTable; // 目标表名 - private Map targetColumns; // 目标表字段映射 + private boolean mirrorDb = false; // 是否镜像库 + private String database; // 数据库名或schema名 + private String table; // 表名 + private Map targetPk = new LinkedHashMap<>(); // 目标表主键字段 + private boolean mapAll = false; // 映射所有字段 + private String targetDb; // 目标库名 + private String targetTable; // 目标表名 + private Map targetColumns; // 目标表字段映射 + private Map targetKeywordsIdentifier = new LinkedHashMap<>(2); // 目标库保留字标识符 - private boolean caseInsensitive = false; // 目标表不区分大小写,默认是否 + private boolean caseInsensitive = false; // 目标表不区分大小写,默认是否 - private String etlCondition; // etl条件sql + private String etlCondition; // etl条件sql - private int readBatch = 5000; - private int commitBatch = 5000; // etl等批量提交大小 + private int readBatch = 5000; + private int commitBatch = 5000; // etl等批量提交大小 private Map allMapColumns; + public DbMapping(){ + // 默认值为"`", 兼容老代码 + this.targetKeywordsIdentifier.put(RDBConstants.KEYWORDS_IDENTIFIER_PREFIX, "`"); + this.targetKeywordsIdentifier.put(RDBConstants.KEYWORDS_IDENTIFIER_SUFFIX, "`"); + } + public boolean getMirrorDb() { return mirrorDb; } @@ -214,6 +221,22 @@ public void setCommitBatch(int commitBatch) { this.commitBatch = commitBatch; } + /** + * 保留字标识符前缀 [prefix]:前缀 [suffix]:后缀
+ * mysql: select `name`,`date` from user
+ * postgresql: select "name","date" from user
+ * mssql: select [name],[date] from user
+ * + * @return 标识符 + */ + public Map getTargetKeywordsIdentifier() { + return targetKeywordsIdentifier; + } + + public void setTargetKeywordsIdentifier(Map targetKeywordsIdentifier) { + this.targetKeywordsIdentifier = targetKeywordsIdentifier; + } + public Map getAllMapColumns() { return allMapColumns; } diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/RDBConstants.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/RDBConstants.java new file mode 100644 index 0000000000..1626510a72 --- /dev/null +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/RDBConstants.java @@ -0,0 +1,17 @@ +package com.alibaba.otter.canal.client.adapter.rdb.config; + +/** + * @author XuDaojie + * @since 2020/9/11 + */ +public class RDBConstants { + + /** + * 保留字标识符前缀 + */ + public static final String KEYWORDS_IDENTIFIER_PREFIX = "prefix"; + /** + * 保留字标识符后缀 + */ + public static final String KEYWORDS_IDENTIFIER_SUFFIX = "suffix"; +} diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java index d27bb54480..57087798b9 100644 --- a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java @@ -1,5 +1,20 @@ package com.alibaba.otter.canal.client.adapter.rdb.service; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig; +import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping; +import com.alibaba.otter.canal.client.adapter.rdb.config.RDBConstants; +import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor; +import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml; +import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil; +import com.alibaba.otter.canal.client.adapter.support.Dml; +import com.alibaba.otter.canal.client.adapter.support.Util; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.sql.Connection; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -17,20 +32,6 @@ import javax.sql.DataSource; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; -import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig; -import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping; -import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor; -import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml; -import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil; -import com.alibaba.otter.canal.client.adapter.support.Dml; -import com.alibaba.otter.canal.client.adapter.support.Util; - /** * RDB同步操作业务 * @@ -242,14 +243,15 @@ private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml DbMapping dbMapping = config.getDbMapping(); + Map keywordsIdentifier = dbMapping.getTargetKeywordsIdentifier(); Map columnsMap = SyncUtil.getColumnsMap(dbMapping, data); StringBuilder insertSql = new StringBuilder(); insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" ("); - columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append("`") + columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_PREFIX)) .append(targetColumnName) - .append("`") + .append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_SUFFIX)) .append(",")); int len = insertSql.length(); insertSql.delete(len - 1, len).append(") VALUES ("); @@ -314,6 +316,7 @@ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml DbMapping dbMapping = config.getDbMapping(); + Map keywordsIdentifier = dbMapping.getTargetKeywordsIdentifier(); Map columnsMap = SyncUtil.getColumnsMap(dbMapping, data); Map ctype = getTargetColumnType(batchExecutor.getConn(), config); @@ -332,7 +335,10 @@ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml if (!targetColumnNames.isEmpty()) { hasMatched = true; for (String targetColumnName : targetColumnNames) { - updateSql.append("`").append(targetColumnName).append("`").append("=?, "); + updateSql.append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_PREFIX)) + .append(targetColumnName) + .append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_SUFFIX)) + .append("=?, "); Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase()); if (type == null) { throw new RuntimeException("Target column: " + targetColumnName + " not matched"); @@ -445,6 +451,8 @@ private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sq private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map ctype, List> values, Map d, Map o) { + Map keywordsIdentifier = dbMapping.getTargetKeywordsIdentifier(); + // 拼接主键 for (Map.Entry entry : dbMapping.getTargetPk().entrySet()) { String targetColumnName = entry.getKey(); @@ -452,7 +460,10 @@ private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sq if (srcColumnName == null) { srcColumnName = Util.cleanColumn(targetColumnName); } - sql.append("`").append(targetColumnName).append("`").append("=? AND "); + sql.append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_PREFIX)) + .append(targetColumnName) + .append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_SUFFIX)) + .append("=? AND "); Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase()); if (type == null) { throw new RuntimeException("Target column: " + targetColumnName + " not matched"); diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java index 0d05a6716a..2a3e37a74b 100644 --- a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java @@ -1,5 +1,8 @@ package com.alibaba.otter.canal.client.adapter.rdb.support; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.sql.Connection; import java.sql.PreparedStatement; @@ -11,9 +14,6 @@ import javax.sql.DataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * sql批量执行器 * @@ -60,6 +60,9 @@ public void execute(String sql, List> values) throws SQLException SyncUtil.setPStmt(type, pstmt, value, i + 1); } + if (logger.isDebugEnabled()) { + logger.debug("DML: {}", pstmt.toString()); + } pstmt.execute(); idx.incrementAndGet(); pstmt.close(); diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java index e2e54018b7..83e5f1e347 100644 --- a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java @@ -2,21 +2,26 @@ import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig; import com.alibaba.otter.canal.client.adapter.support.Util; + import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.Reader; import java.io.StringReader; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; -import java.sql.*; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; public class SyncUtil { - private static final Logger logger = LoggerFactory.getLogger(SyncUtil.class); public static Map getColumnsMap(MappingConfig.DbMapping dbMapping, Map data) { return getColumnsMap(dbMapping, data.keySet()); @@ -54,10 +59,10 @@ public static Map getColumnsMap(MappingConfig.DbMapping dbMappin /** * 设置 preparedStatement * - * @param type sqlType + * @param type sqlType * @param pstmt 需要设置的preparedStatement * @param value 值 - * @param i 索引号 + * @param i 索引号 */ public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException { switch (type) { @@ -258,9 +263,9 @@ public static void setPStmt(int type, PreparedStatement pstmt, Object value, int public static String getDbTableName(MappingConfig.DbMapping dbMapping) { String result = ""; if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) { - result += ("`" + dbMapping.getTargetDb() + "`."); + result += dbMapping.getTargetDb() + "."; } - result += ("`" + dbMapping.getTargetTable() + "`"); + result += dbMapping.getTargetTable(); return result; } }