Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Improve][Connector-v2] Support '-' in filedName placeholders in jdbc…
Browse files Browse the repository at this point in the history
… sink
dailai committed Dec 6, 2024
1 parent e6f92fd commit b2f5cbe
Showing 7 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -47,6 +47,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@@ -667,29 +669,25 @@ public static FieldNamedPreparedStatement prepareStatement(
connection.prepareStatement(parsedSQL), indexMapping);
}

private static String parseNamedStatement(String sql, Map<String, List<Integer>> paramMap) {
StringBuilder parsedSql = new StringBuilder();
int fieldIndex = 1; // SQL statement parameter index starts from 1
int length = sql.length();
for (int i = 0; i < length; i++) {
char c = sql.charAt(i);
if (':' == c) {
int j = i + 1;
while (j < length && Character.isJavaIdentifierPart(sql.charAt(j))) {
j++;
}
String parameterName = sql.substring(i + 1, j);
checkArgument(
!parameterName.isEmpty(),
"Named parameters in SQL statement must not be empty.");
paramMap.computeIfAbsent(parameterName, n -> new ArrayList<>()).add(fieldIndex);
fieldIndex++;
i = j - 1;
parsedSql.append('?');
} else {
parsedSql.append(c);
}
public static String parseNamedStatement(String sql, Map<String, List<Integer>> paramMap) {
Pattern pattern =
Pattern.compile(":([\\p{L}\\p{Nl}\\p{Nd}\\p{Pc}\\$\\-\\.@%&*#~!?^+=<>|]+)");
Matcher matcher = pattern.matcher(sql);

StringBuffer result = new StringBuffer();
int fieldIndex = 1;

while (matcher.find()) {
String parameterName = matcher.group(1);
checkArgument(
!parameterName.isEmpty(),
"Named parameters in SQL statement must not be empty.");
paramMap.computeIfAbsent(parameterName, n -> new ArrayList<>()).add(fieldIndex++);
matcher.appendReplacement(result, "?");
}
return parsedSql.toString();

matcher.appendTail(result);

return result.toString();
}
}
Original file line number Diff line number Diff line change
@@ -104,7 +104,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
private static final String CREATE_SQL =
"CREATE TABLE IF NOT EXISTS %s\n"
+ "(\n"
+ " `c_bit_1` bit(1) DEFAULT NULL,\n"
+ " `c-bit_1` bit(1) DEFAULT NULL,\n"
+ " `c_bit_8` bit(8) DEFAULT NULL,\n"
+ " `c_bit_16` bit(16) DEFAULT NULL,\n"
+ " `c_bit_32` bit(32) DEFAULT NULL,\n"
@@ -191,7 +191,7 @@ protected void checkResult(
String executeKey, TestContainer container, Container.ExecResult execResult) {
String[] fieldNames =
new String[] {
"c_bit_1",
"c-bit_1",
"c_bit_8",
"c_bit_16",
"c_bit_32",
@@ -249,7 +249,7 @@ String driverUrl() {
Pair<String[], List<SeaTunnelRow>> initTestData() {
String[] fieldNames =
new String[] {
"c_bit_1",
"c-bit_1",
"c_bit_8",
"c_bit_16",
"c_bit_32",
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ sink {
user = "root"
password = "Abc!@#135_seatunnel"

query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ CREATE TABLE sink_table WITH (


INSERT INTO sink_table
SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
SELECT `c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ sink {
user = "root"
password = "Abc!@#135_seatunnel"
connection_check_timeout_sec = 100
query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ CREATE TABLE sink_table WITH (


CREATE TABLE temp1 AS
SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
SELECT `c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
@@ -58,4 +58,4 @@ CREATE TABLE temp1 AS


INSERT INTO sink_table SELECT * FROM temp1;

Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ sink {
user = "root"
password = "Abc!@#135_seatunnel"
connection_check_timeout_sec = 100
query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,

0 comments on commit b2f5cbe

Please sign in to comment.