Skip to content

Commit

Permalink
[Bug][CDC] Fix jdbc sink generate update sql (apache#3940)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Jan 14, 2023
1 parent 1de3b78 commit 6a701d9
Show file tree
Hide file tree
Showing 7 changed files with 906 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;

import com.google.common.base.Strings;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.IntFunction;

@Slf4j
@RequiredArgsConstructor
public class JdbcOutputFormatBuilder {
@NonNull
Expand Down Expand Up @@ -132,8 +135,14 @@ private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateExec
String[] pkNames) {

return new InsertOrUpdateBatchStatementExecutor(
connection -> connection.prepareStatement(dialect.getInsertIntoStatement(table, rowType.getFieldNames())),
connection -> connection.prepareStatement(dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames)),
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
rowType.getFieldNames()),
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames),
rowType.getFieldNames()),
rowType,
dialect.getRowConverter());
}
Expand All @@ -144,12 +153,20 @@ private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateByQu
String[] pkNames,
SeaTunnelDataType[] pkTypes,
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor) {

SeaTunnelRowType keyRowType = new SeaTunnelRowType(pkNames, pkTypes);
return new InsertOrUpdateBatchStatementExecutor(
connection -> connection.prepareStatement(dialect.getRowExistsStatement(table, pkNames)),
connection -> connection.prepareStatement(dialect.getInsertIntoStatement(table, rowType.getFieldNames())),
connection -> connection.prepareStatement(dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames)),
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getRowExistsStatement(table, pkNames),
pkNames),
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
rowType.getFieldNames()),
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames),
rowType.getFieldNames()),
keyRowType,
keyExtractor,
rowType,
Expand All @@ -176,7 +193,10 @@ private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(Str
SeaTunnelRowType rowType,
JdbcRowConverter rowConverter) {
return new SimpleBatchStatementExecutor(
connection -> connection.prepareStatement(sql),
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
sql,
rowType.getFieldNames()),
rowType,
rowConverter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ default String getInsertIntoStatement(String tableName, String[] fieldNames) {
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fieldNames)
.map(fieldName -> "?")
.map(fieldName -> ":" + fieldName)
.collect(Collectors.joining(", "));
return String.format("INSERT INTO %s (%s) VALUES (%s)",
quoteIdentifier(tableName), columns, placeholders);
Expand All @@ -104,10 +104,10 @@ default String getInsertIntoStatement(String tableName, String[] fieldNames) {
*/
default String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
String setClause = Arrays.stream(fieldNames)
.map(fieldName -> String.format("%s = ?", quoteIdentifier(fieldName)))
.map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), fieldName))
.collect(Collectors.joining(", "));
String conditionClause = Arrays.stream(conditionFields)
.map(fieldName -> String.format("%s = ?", quoteIdentifier(fieldName)))
.map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), fieldName))
.collect(Collectors.joining(" AND "));
return String.format("UPDATE %s SET %s WHERE %s",
quoteIdentifier(tableName), setClause, conditionClause);
Expand All @@ -126,7 +126,7 @@ default String getUpdateStatement(String tableName, String[] fieldNames, String[
*/
default String getDeleteStatement(String tableName, String[] conditionFields) {
String conditionClause = Arrays.stream(conditionFields)
.map(fieldName -> format("%s = ?", quoteIdentifier(fieldName)))
.map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), fieldName))
.collect(Collectors.joining(" AND "));
return String.format("DELETE FROM %s WHERE %s",
quoteIdentifier(tableName), conditionClause);
Expand All @@ -144,7 +144,7 @@ default String getDeleteStatement(String tableName, String[] conditionFields) {
*/
default String getRowExistsStatement(String tableName, String[] conditionFields) {
String fieldExpressions = Arrays.stream(conditionFields)
.map(field -> format("%s = ?", quoteIdentifier(field)))
.map(field -> format("%s = :%s", quoteIdentifier(field), field))
.collect(Collectors.joining(" AND "));
return String.format("SELECT 1 FROM %s WHERE %s",
quoteIdentifier(tableName), fieldExpressions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Optional<String> getUpsertStatement(String tableName, String[] fieldNames
.filter(fieldName -> !Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
String valuesBinding = Arrays.stream(fieldNames)
.map(fieldName -> "? " + quoteIdentifier(fieldName))
.map(fieldName -> ":" + fieldName + " " + quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));

String usingClause = String.format("SELECT %s FROM DUAL", valuesBinding);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Optional<String> getUpsertStatement(String tableName, String[] fieldNames
.filter(fieldName -> !Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
String valuesBinding = Arrays.stream(fieldNames)
.map(fieldName -> "? " + quoteIdentifier(fieldName))
.map(fieldName -> ":" + fieldName + " " + quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));

String usingClause = String.format("SELECT %s", valuesBinding);
Expand Down
Loading

0 comments on commit 6a701d9

Please sign in to comment.