Skip to content

Commit

Permalink
[bugfix] fix mysql create table comment special string bug (#6998)
Browse files Browse the repository at this point in the history
  • Loading branch information
chl-wxp authored Jun 17, 2024
1 parent 59720a7 commit 904e9cf
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ private String buildColumnIdentifySql(
}

if (column.getComment() != null) {
columnSqls.add("COMMENT '" + column.getComment() + "'");
columnSqls.add(
"COMMENT '"
+ column.getComment().replace("'", "''").replace("\\", "\\\\")
+ "'");
}

return String.join(" ", columnSqls);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
Expand Down Expand Up @@ -66,7 +67,7 @@ public class JdbcMySqlSaveModeCatalogIT extends TestSuiteBase implements TestRes
"CREATE TABLE IF NOT EXISTS mysql_auto_create\n"
+ "(\n "
+ "`id` int(11) NOT NULL AUTO_INCREMENT,\n"
+ " `f_binary` binary(64) DEFAULT NULL,\n"
+ " `f_binary` binary(64) DEFAULT NULL COMMENT '\"#¥%……&*();;'',,..``````//''@特殊注释''\\\\''\"',\n"
+ " `f_smallint` smallint(6) DEFAULT NULL,\n"
+ " `f_smallint_unsigned` smallint(5) unsigned DEFAULT NULL,\n"
+ " `f_mediumint` mediumint(9) DEFAULT NULL,\n"
Expand Down Expand Up @@ -153,30 +154,38 @@ public void startUp() throws Exception {
@Test
public void testCatalog() {
TablePath tablePathMySql = TablePath.of("auto", "mysql_auto_create");
TablePath tablePathMySql_Sink = TablePath.of("auto", "mysql_auto_create_sink");
TablePath tablePathMySqlSink = TablePath.of("auto", "mysql_auto_create_sink");
MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", MYSQL_PASSWORD, MysqlUrlInfo);
mySqlCatalog.open();
CatalogTable catalogTable = mySqlCatalog.getTable(tablePathMySql);
// source comment
Assertions.assertEquals(
catalogTable.getTableSchema().getColumns().get(1).getComment(),
"\"#¥%……&*();;',,..``````//'@特殊注释'\\'\"");
// sink tableExists ?
boolean tableExistsBefore = mySqlCatalog.tableExists(tablePathMySql_Sink);
boolean tableExistsBefore = mySqlCatalog.tableExists(tablePathMySqlSink);
Assertions.assertFalse(tableExistsBefore);
// create table
mySqlCatalog.createTable(tablePathMySql_Sink, catalogTable, true);
boolean tableExistsAfter = mySqlCatalog.tableExists(tablePathMySql_Sink);
mySqlCatalog.createTable(tablePathMySqlSink, catalogTable, true);
boolean tableExistsAfter = mySqlCatalog.tableExists(tablePathMySqlSink);
Assertions.assertTrue(tableExistsAfter);
// comment
final CatalogTable sinkTable = mySqlCatalog.getTable(tablePathMySqlSink);
final Column column = sinkTable.getTableSchema().getColumns().get(1);
Assertions.assertEquals(column.getComment(), "\"#¥%……&*();;',,..``````//'@特殊注释'\\'\"");
// isExistsData ?
boolean existsDataBefore = mySqlCatalog.isExistsData(tablePathMySql_Sink);
boolean existsDataBefore = mySqlCatalog.isExistsData(tablePathMySqlSink);
Assertions.assertFalse(existsDataBefore);
// insert one data
mySqlCatalog.executeSql(tablePathMySql_Sink, customSql);
boolean existsDataAfter = mySqlCatalog.isExistsData(tablePathMySql_Sink);
mySqlCatalog.executeSql(tablePathMySqlSink, customSql);
boolean existsDataAfter = mySqlCatalog.isExistsData(tablePathMySqlSink);
Assertions.assertTrue(existsDataAfter);
// truncateTable
mySqlCatalog.truncateTable(tablePathMySql_Sink, true);
Assertions.assertFalse(mySqlCatalog.isExistsData(tablePathMySql_Sink));
mySqlCatalog.truncateTable(tablePathMySqlSink, true);
Assertions.assertFalse(mySqlCatalog.isExistsData(tablePathMySqlSink));
// drop table
mySqlCatalog.dropTable(tablePathMySql_Sink, true);
Assertions.assertFalse(mySqlCatalog.tableExists(tablePathMySql_Sink));
mySqlCatalog.dropTable(tablePathMySqlSink, true);
Assertions.assertFalse(mySqlCatalog.tableExists(tablePathMySqlSink));
mySqlCatalog.close();
}

Expand Down

0 comments on commit 904e9cf

Please sign in to comment.