Skip to content

Commit

Permalink
Merge remote-tracking branch 'xj-apache-seatunnel/dev-jdbc-identifier…
Browse files Browse the repository at this point in the history
…' into dev-jdbc-identifier
  • Loading branch information
XiaoJiang521 committed Sep 9, 2023
2 parents 6e3b0b6 + 12c7d30 commit 3743fe1
Show file tree
Hide file tree
Showing 20 changed files with 64 additions and 64 deletions.
6 changes: 3 additions & 3 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| max_commit_attempts | Int | No | 3 |
| transaction_timeout_sec | Int | No | -1 |
| auto_commit | Boolean | No | true |
| field_ide | String | No | - |
| identifier_case | String | No | - |
| common-options | | no | - |

### driver [string]
Expand Down Expand Up @@ -137,9 +137,9 @@ exactly-once semantics

Automatic transaction commit is enabled by default

### field_ide [String]
### identifier_case [String]

The field "field_ide" is used to identify whether the field needs to be converted to uppercase or lowercase when
The field "identifier_case" is used to identify whether the field needs to be converted to uppercase or lowercase when
synchronizing from the source to the sink. "ORIGINAL" indicates no conversion is needed, "UPPERCASE" indicates
conversion to uppercase, and "LOWERCASE" indicates conversion to lowercase.

Expand Down
4 changes: 2 additions & 2 deletions docs/en/connector-v2/sink/Mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ semantics (using XA transaction guarantee).
| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures |
| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect<br/>exactly-once semantics |
| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default |
| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. |
| identifier_case | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. |
| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |

### Tips
Expand Down Expand Up @@ -192,7 +192,7 @@ sink {
database = test
table = sink_table
primary_keys = ["id","name"]
field_ide = UPPERCASE
identifier_case = UPPERCASE
}
}
```
Expand Down
4 changes: 2 additions & 2 deletions docs/en/connector-v2/sink/PostgreSql.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ semantics (using XA transaction guarantee).
| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures |
| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect<br/>exactly-once semantics |
| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default |
| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. |
| identifier_case | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. |
| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |

### Tips
Expand Down Expand Up @@ -198,7 +198,7 @@ sink {
database = test
table = sink_table
primary_keys = ["id","name"]
field_ide = UPPERCASE
identifier_case = UPPERCASE
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase;

import java.math.BigDecimal;
import java.util.List;
Expand Down Expand Up @@ -156,9 +156,9 @@ public interface JdbcOptions {
.noDefaultValue()
.withDescription("partition num");

Option<FieldIdeEnum> FIELD_IDE =
Options.key("field_ide")
.enumType(FieldIdeEnum.class)
Option<IdentifierCase> IDENTIFIER_CASE =
Options.key("identifier_case")
.enumType(IdentifierCase.class)
.noDefaultValue()
.withDescription("Whether case conversion is required");
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -227,11 +227,11 @@ default String extractTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}

default String getFieldIde(String identifier, String fieldIde) {
if (StringUtils.isEmpty(fieldIde)) {
default String getIdentifierCase(String identifier, String identifierCase) {
if (StringUtils.isEmpty(identifierCase)) {
return identifier;
}
switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) {
switch (IdentifierCase.valueOf(identifierCase.toUpperCase())) {
case LOWERCASE:
return identifier.toLowerCase();
case UPPERCASE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static JdbcDialect load(String url, String compatibleMode) {
* unambiguously process the given database URL.
* @return The loaded dialect.
*/
public static JdbcDialect load(String url, String compatibleMode, String fieldIde) {
public static JdbcDialect load(String url, String compatibleMode, String identifierCase) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
List<JdbcDialectFactory> foundFactories = discoverFactories(cl);

Expand Down Expand Up @@ -94,7 +94,7 @@ public static JdbcDialect load(String url, String compatibleMode, String fieldId
.collect(Collectors.joining("\n"))));
}

return matchingFactories.get(0).create(compatibleMode, fieldIde);
return matchingFactories.get(0).create(compatibleMode, identifierCase);
}

private static List<JdbcDialectFactory> discoverFactories(ClassLoader classLoader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum;

public enum FieldIdeEnum {
public enum IdentifierCase {
ORIGINAL("original"), // Original string form
UPPERCASE("uppercase"), // Convert to uppercase
LOWERCASE("lowercase"); // Convert to lowercase

private final String value;

FieldIdeEnum(String value) {
IdentifierCase(String value) {
this.value = value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public JdbcDialect create() {
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
return new MysqlDialect(fieldIde);
public JdbcDialect create(@Nonnull String compatibleMode, String identifierCase) {
return new MysqlDialect(identifierCase);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -32,12 +32,12 @@
import java.util.stream.Collectors;

public class MysqlDialect implements JdbcDialect {
public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
public String identifierCase = IdentifierCase.ORIGINAL.getValue();

public MysqlDialect() {}

public MysqlDialect(String fieldIde) {
this.fieldIde = fieldIde;
public MysqlDialect(String identifierCase) {
this.identifierCase = identifierCase;
}

@Override
Expand All @@ -57,7 +57,7 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {

@Override
public String quoteIdentifier(String identifier) {
return "`" + getFieldIde(identifier, fieldIde) + "`";
return "`" + getIdentifierCase(identifier, identifierCase) + "`";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public JdbcDialect create() {
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
public JdbcDialect create(@Nonnull String compatibleMode, String identifierCase) {
if ("oracle".equalsIgnoreCase(compatibleMode)) {
return new OracleDialect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -34,10 +34,10 @@
public class OracleDialect implements JdbcDialect {

private static final int DEFAULT_ORACLE_FETCH_SIZE = 128;
public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
public String identifierCase = IdentifierCase.ORIGINAL.getValue();

public OracleDialect(String fieldIde) {
this.fieldIde = fieldIde;
public OracleDialect(String identifierCase) {
this.identifierCase = identifierCase;
}

public OracleDialect() {}
Expand Down Expand Up @@ -71,11 +71,11 @@ public String quoteIdentifier(String identifier) {
sb.append("\"").append(parts[i]).append("\"").append(".");
}
return sb.append("\"")
.append(getFieldIde(parts[parts.length - 1], fieldIde))
.append(getIdentifierCase(parts[parts.length - 1], identifierCase))
.append("\"")
.toString();
}
return "\"" + getFieldIde(identifier, fieldIde) + "\"";
return "\"" + getIdentifierCase(identifier, identifierCase) + "\"";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public JdbcDialect create() {
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
return new OracleDialect(fieldIde);
public JdbcDialect create(@Nonnull String compatibleMode, String identifierCase) {
return new OracleDialect(identifierCase);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -34,12 +34,12 @@ public class PostgresDialect implements JdbcDialect {

public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;

public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
public String identifierCase = IdentifierCase.ORIGINAL.getValue();

public PostgresDialect() {}

public PostgresDialect(String fieldIde) {
this.fieldIde = fieldIde;
public PostgresDialect(String identifierCase) {
this.identifierCase = identifierCase;
}

@Override
Expand Down Expand Up @@ -113,12 +113,12 @@ public String quoteIdentifier(String identifier) {
sb.append("\"").append(parts[i]).append("\"").append(".");
}
return sb.append("\"")
.append(getFieldIde(parts[parts.length - 1], fieldIde))
.append(getIdentifierCase(parts[parts.length - 1], identifierCase))
.append("\"")
.toString();
}

return "\"" + getFieldIde(identifier, fieldIde) + "\"";
return "\"" + getIdentifierCase(identifier, identifierCase) + "\"";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public JdbcDialect create() {
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
public JdbcDialect create(@Nonnull String compatibleMode, String identifierCase) {
if ("postgresLow".equalsIgnoreCase(compatibleMode)) {
return new PostgresLowDialect(fieldIde);
return new PostgresLowDialect(identifierCase);
}
return new PostgresDialect(fieldIde);
return new PostgresDialect(identifierCase);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

public class PostgresLowDialect extends PostgresDialect {

public PostgresLowDialect(String fieldIde) {
this.fieldIde = fieldIde;
public PostgresLowDialect(String identifierCase) {
this.identifierCase = identifierCase;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase;

import java.util.Arrays;
import java.util.List;
Expand All @@ -29,12 +29,12 @@

public class SqlServerDialect implements JdbcDialect {

public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
public String identifierCase = IdentifierCase.ORIGINAL.getValue();

public SqlServerDialect() {}

public SqlServerDialect(String fieldIde) {
this.fieldIde = fieldIde;
public SqlServerDialect(String identifierCase) {
this.identifierCase = identifierCase;
}

@Override
Expand Down Expand Up @@ -125,12 +125,12 @@ public String quoteIdentifier(String identifier) {
sb.append("[").append(parts[i]).append("]").append(".");
}
return sb.append("[")
.append(getFieldIde(parts[parts.length - 1], fieldIde))
.append(getIdentifierCase(parts[parts.length - 1], identifierCase))
.append("]")
.toString();
}

return "[" + getFieldIde(identifier, fieldIde) + "]";
return "[" + getIdentifierCase(identifier, identifierCase) + "]";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public JdbcDialect create() {
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
return new SqlServerDialect(fieldIde);
public JdbcDialect create(@Nonnull String compatibleMode, String identifierCase) {
return new SqlServerDialect(identifierCase);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
Expand Down Expand Up @@ -110,9 +110,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
JdbcDialectLoader.load(
jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
config.get(JdbcOptions.FIELD_IDE) == null
? FieldIdeEnum.ORIGINAL.getValue()
: config.get(JdbcOptions.FIELD_IDE).getValue());
config.get(JdbcOptions.IDENTIFIER_CASE) == null
? IdentifierCase.ORIGINAL.getValue()
: config.get(JdbcOptions.IDENTIFIER_CASE).getValue());
this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
}

Expand Down
Loading

0 comments on commit 3743fe1

Please sign in to comment.