Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: v1.1.5-snapshot rdb-adapter 由关键字引起的异常 (#3026 #2783) #3020

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions client-adapter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文

### 4.2 适配器表映射文件
修改 conf/rdb/mytest_user.yml文件:
```
``` yml
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # cannal的instance或者MQ的topic
outerAdapterKey: oracle1 # adapter key, 对应上面配置outAdapters中的key
Expand All @@ -309,7 +309,10 @@ dbMapping:
name:
role_id:
c_time:
test1:
test1:
targetKeywordsIdentifier: # 关键字、保留字标识符
prefix: '`' # 前缀,默认值为 `
suffix: '`' # 后缀,默认值为 `
```
导入的类型以目标表的元类型为准, 将自动转换

Expand Down
2 changes: 1 addition & 1 deletion client-adapter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.1.4</version>
<version>42.2.16</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.alibaba.otter.canal.client.adapter.rdb.config;

import java.util.LinkedHashMap;
import java.util.Map;
import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;

import org.apache.commons.lang.StringUtils;

import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
import java.util.LinkedHashMap;
import java.util.Map;

/**
* RDB表映射配置
Expand Down Expand Up @@ -93,24 +93,31 @@ public void validate() {

public static class DbMapping implements AdapterMapping {

private boolean mirrorDb = false; // 是否镜像库
private String database; // 数据库名或schema名
private String table; // 表名
private Map<String, String> targetPk = new LinkedHashMap<>(); // 目标表主键字段
private boolean mapAll = false; // 映射所有字段
private String targetDb; // 目标库名
private String targetTable; // 目标表名
private Map<String, String> targetColumns; // 目标表字段映射
private boolean mirrorDb = false; // 是否镜像库
private String database; // 数据库名或schema名
private String table; // 表名
private Map<String, String> targetPk = new LinkedHashMap<>(); // 目标表主键字段
private boolean mapAll = false; // 映射所有字段
private String targetDb; // 目标库名
private String targetTable; // 目标表名
private Map<String, String> targetColumns; // 目标表字段映射
private Map<String, String> targetKeywordsIdentifier = new LinkedHashMap<>(2); // 目标库保留字标识符

private boolean caseInsensitive = false; // 目标表不区分大小写,默认是否
private boolean caseInsensitive = false; // 目标表不区分大小写,默认是否

private String etlCondition; // etl条件sql
private String etlCondition; // etl条件sql

private int readBatch = 5000;
private int commitBatch = 5000; // etl等批量提交大小
private int readBatch = 5000;
private int commitBatch = 5000; // etl等批量提交大小

private Map<String, String> allMapColumns;

public DbMapping(){
// 默认值为"`", 兼容老代码
this.targetKeywordsIdentifier.put(RDBConstants.KEYWORDS_IDENTIFIER_PREFIX, "`");
this.targetKeywordsIdentifier.put(RDBConstants.KEYWORDS_IDENTIFIER_SUFFIX, "`");
}

public boolean getMirrorDb() {
return mirrorDb;
}
Expand Down Expand Up @@ -214,6 +221,22 @@ public void setCommitBatch(int commitBatch) {
this.commitBatch = commitBatch;
}

/**
* 保留字标识符前缀 [prefix]:前缀 [suffix]:后缀<br>
* mysql: <code>select `name`,`date` from user</code><br>
* postgresql: <code>select "name","date" from user</code><br>
* mssql: <code>select [name],[date] from user</code><br>
*
* @return 标识符
*/
public Map<String, String> getTargetKeywordsIdentifier() {
return targetKeywordsIdentifier;
}

public void setTargetKeywordsIdentifier(Map<String, String> targetKeywordsIdentifier) {
this.targetKeywordsIdentifier = targetKeywordsIdentifier;
}

public Map<String, String> getAllMapColumns() {
return allMapColumns;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.alibaba.otter.canal.client.adapter.rdb.config;

/**
* @author XuDaojie
* @since 2020/9/11
*/
public class RDBConstants {

/**
* 保留字标识符前缀
*/
public static final String KEYWORDS_IDENTIFIER_PREFIX = "prefix";
/**
* 保留字标识符后缀
*/
public static final String KEYWORDS_IDENTIFIER_SUFFIX = "suffix";
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
package com.alibaba.otter.canal.client.adapter.rdb.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
import com.alibaba.otter.canal.client.adapter.rdb.config.RDBConstants;
import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
import com.alibaba.otter.canal.client.adapter.support.Dml;
import com.alibaba.otter.canal.client.adapter.support.Util;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
Expand All @@ -17,20 +32,6 @@

import javax.sql.DataSource;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
import com.alibaba.otter.canal.client.adapter.support.Dml;
import com.alibaba.otter.canal.client.adapter.support.Util;

/**
* RDB同步操作业务
*
Expand Down Expand Up @@ -242,14 +243,15 @@ private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml

DbMapping dbMapping = config.getDbMapping();

Map<String, String> keywordsIdentifier = dbMapping.getTargetKeywordsIdentifier();
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

StringBuilder insertSql = new StringBuilder();
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");

columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append("`")
columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_PREFIX))
.append(targetColumnName)
.append("`")
.append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_SUFFIX))
.append(","));
int len = insertSql.length();
insertSql.delete(len - 1, len).append(") VALUES (");
Expand Down Expand Up @@ -314,6 +316,7 @@ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml

DbMapping dbMapping = config.getDbMapping();

Map<String, String> keywordsIdentifier = dbMapping.getTargetKeywordsIdentifier();
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
Expand All @@ -332,7 +335,10 @@ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml
if (!targetColumnNames.isEmpty()) {
hasMatched = true;
for (String targetColumnName : targetColumnNames) {
updateSql.append("`").append(targetColumnName).append("`").append("=?, ");
updateSql.append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_PREFIX))
.append(targetColumnName)
.append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_SUFFIX))
.append("=?, ");
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
Expand Down Expand Up @@ -445,14 +451,19 @@ private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sq

private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
Map<String, String> keywordsIdentifier = dbMapping.getTargetKeywordsIdentifier();

// 拼接主键
for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
String targetColumnName = entry.getKey();
String srcColumnName = entry.getValue();
if (srcColumnName == null) {
srcColumnName = Util.cleanColumn(targetColumnName);
}
sql.append("`").append(targetColumnName).append("`").append("=? AND ");
sql.append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_PREFIX))
.append(targetColumnName)
.append(keywordsIdentifier.get(RDBConstants.KEYWORDS_IDENTIFIER_SUFFIX))
.append("=? AND ");
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.alibaba.otter.canal.client.adapter.rdb.support;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -11,9 +14,6 @@

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* sql批量执行器
*
Expand Down Expand Up @@ -60,6 +60,9 @@ public void execute(String sql, List<Map<String, ?>> values) throws SQLException
SyncUtil.setPStmt(type, pstmt, value, i + 1);
}

if (logger.isDebugEnabled()) {
logger.debug("DML: {}", pstmt.toString());
}
pstmt.execute();
idx.incrementAndGet();
pstmt.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,26 @@

import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.support.Util;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Reader;
import java.io.StringReader;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;

public class SyncUtil {
private static final Logger logger = LoggerFactory.getLogger(SyncUtil.class);

public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Map<String, Object> data) {
return getColumnsMap(dbMapping, data.keySet());
Expand Down Expand Up @@ -54,10 +59,10 @@ public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMappin
/**
* 设置 preparedStatement
*
* @param type sqlType
* @param type sqlType
* @param pstmt 需要设置的preparedStatement
* @param value 值
* @param i 索引号
* @param i 索引号
*/
public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
switch (type) {
Expand Down Expand Up @@ -258,9 +263,9 @@ public static void setPStmt(int type, PreparedStatement pstmt, Object value, int
public static String getDbTableName(MappingConfig.DbMapping dbMapping) {
String result = "";
if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) {
result += ("`" + dbMapping.getTargetDb() + "`.");
result += dbMapping.getTargetDb() + ".";
}
result += ("`" + dbMapping.getTargetTable() + "`");
result += dbMapping.getTargetTable();
return result;
}
}