Skip to content

Commit

Permalink
[Improve][Connector-v2] Optimize the count table rows for jdbc-oracle…
Browse files Browse the repository at this point in the history
… and oracle-cdc (apache#7248)
  • Loading branch information
dailai authored Jul 25, 2024
1 parent cc59499 commit 0d08b20
Show file tree
Hide file tree
Showing 23 changed files with 594 additions and 179 deletions.
198 changes: 56 additions & 142 deletions docs/en/connector-v2/source/Jdbc.md

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions docs/en/connector-v2/source/Oracle-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ exit;
| sample-sharding.threshold | Integer | No | 1000 | This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. |
| inverse-sampling.rate | Integer | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. |
| exactly_once | Boolean | No | false | Enable exactly once semantic. |
| use_select_count | Boolean | No | false | Use select count for table count rather then other methods in full stage.In this scenario, select count directly is used when it is faster to update statistics using sql from analysis table |
| skip_analyze | Boolean | No | false | Skip the analysis of table count in full stage.In this scenario, you schedule analysis table sql to update related table statistics periodically or your table data does not change frequently |
| format | Enum | No | DEFAULT | Optional output format for Oracle CDC, valid enumerations are `DEFAULT``COMPATIBLE_DEBEZIUM_JSON`. |
| debezium | Config | No | - | Pass-through [Debezium's properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/oracle.adoc#connector-properties) to Debezium Embedded Engine which is used to capture data changes from Oracle server. |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
Expand All @@ -270,6 +272,44 @@ source {
}
```

> Use the select count(*) instead of analysis table for count table rows in full stage
>
> ```conf
> source {
> # This is a example source plugin **only for test and demonstrate the feature source plugin**
> Oracle-CDC {
> result_table_name = "customers"
> use_select_count = true
> username = "system"
> password = "oracle"
> database-names = ["XE"]
> schema-names = ["DEBEZIUM"]
> table-names = ["XE.DEBEZIUM.FULL_TYPES"]
> base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
> source.reader.close.timeout = 120000
> }
> }
> ```
>
> Use the select NUM_ROWS from all_tables for the table rows but skip the analyze table.
>
> ```conf
> source {
> # This is a example source plugin **only for test and demonstrate the feature source plugin**
> Oracle-CDC {
> result_table_name = "customers"
> skip_analyze = true
> username = "system"
> password = "oracle"
> database-names = ["XE"]
> schema-names = ["DEBEZIUM"]
> table-names = ["XE.DEBEZIUM.FULL_TYPES"]
> base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
> source.reader.close.timeout = 120000
> }
> }
> ```
### Support custom primary key for table
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
import lombok.Getter;

import java.util.List;
import java.util.Properties;
Expand All @@ -32,11 +33,17 @@
* Describes the connection information of the Oracle database and the configuration information for
* performing snapshotting and streaming reading, such as splitSize.
*/
@Getter
public class OracleSourceConfig extends JdbcSourceConfig {

private static final long serialVersionUID = 1L;

private final Boolean useSelectCount;
private final Boolean skipAnalyze;

public OracleSourceConfig(
Boolean useSelectCount,
Boolean skipAnalyze,
StartupConfig startupConfig,
StopConfig stopConfig,
List<String> databaseList,
Expand Down Expand Up @@ -82,6 +89,8 @@ public OracleSourceConfig(
connectMaxRetries,
connectionPoolSize,
exactlyOnce);
this.useSelectCount = useSelectCount;
this.skipAnalyze = skipAnalyze;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory {
private static final String DRIVER_CLASS_NAME = "oracle.jdbc.driver.OracleDriver";

private List<String> schemaList;

private Boolean useSelectCount;

private Boolean skipAnalyze;
/**
* An optional list of regular expressions that match schema names to be monitored; any schema
* name not included in the whitelist will be excluded from monitoring. By default all
Expand All @@ -48,6 +52,16 @@ public JdbcSourceConfigFactory schemaList(List<String> schemaList) {
return this;
}

public JdbcSourceConfigFactory useSelectCount(Boolean useSelectCount) {
this.useSelectCount = useSelectCount;
return this;
}

public JdbcSourceConfigFactory skipAnalyze(Boolean skipAnalyze) {
this.skipAnalyze = skipAnalyze;
return this;
}

/** Creates a new {@link OracleSourceConfig} for the given subtask {@code subtaskId}. */
public OracleSourceConfig create(int subtask) {

Expand Down Expand Up @@ -123,6 +137,8 @@ public OracleSourceConfig create(int subtask) {
}

return new OracleSourceConfig(
useSelectCount,
skipAnalyze,
startupConfig,
stopConfig,
databaseList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(Readonly
configFactory.startupOptions(startupConfig);
configFactory.stopOptions(stopConfig);
configFactory.schemaList(config.get(OracleSourceOptions.SCHEMA_NAMES));
configFactory.useSelectCount(config.get(OracleSourceOptions.USE_SELECT_COUNT));
configFactory.skipAnalyze(config.get(OracleSourceOptions.SKIP_ANALYZE));
configFactory.originUrl(config.get(JdbcCatalogOptions.BASE_URL));
return configFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public OptionRule optionRule() {
JdbcCatalogOptions.BASE_URL,
JdbcSourceOptions.DATABASE_NAMES,
OracleSourceOptions.SCHEMA_NAMES,
OracleSourceOptions.USE_SELECT_COUNT,
OracleSourceOptions.SKIP_ANALYZE,
JdbcSourceOptions.SERVER_TIME_ZONE,
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
JdbcSourceOptions.CONNECT_MAX_RETRIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,16 @@ public class OracleSourceOptions {
.listType()
.noDefaultValue()
.withDescription("Schema name of the database to monitor.");

public static final Option<Boolean> USE_SELECT_COUNT =
Options.key("use_select_count")
.booleanType()
.defaultValue(false)
.withDescription("Use select count for table count in full stage");

public static final Option<Boolean> SKIP_ANALYZE =
Options.key("skip_analyze")
.booleanType()
.defaultValue(false)
.withDescription("Skip the analysis of table count in full stage");
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleTypeUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils;

Expand All @@ -41,8 +42,11 @@
@Slf4j
public class OracleChunkSplitter extends AbstractJdbcSourceChunkSplitter {

private final OracleSourceConfig oracleSourceConfig;

public OracleChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
super(sourceConfig, dialect);
this.oracleSourceConfig = (OracleSourceConfig) sourceConfig;
}

@Override
Expand Down Expand Up @@ -80,7 +84,7 @@ public Object queryNextChunkMax(

@Override
public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
return OracleUtils.queryApproximateRowCnt(jdbc, tableId);
return OracleUtils.queryApproximateRowCnt(oracleSourceConfig, jdbc, tableId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset;

import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -81,27 +82,41 @@ public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String
});
}

public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
public static long queryApproximateRowCnt(
OracleSourceConfig oracleSourceConfig, JdbcConnection jdbc, TableId tableId)
throws SQLException {
final String analyzeTable =
String.format(
"analyze table %s compute statistics for table",
quoteSchemaAndTable(tableId));
final String rowCountQuery =
String.format(
"select NUM_ROWS from all_tables where TABLE_NAME = '%s'", tableId.table());
return jdbc.execute(analyzeTable)
.queryAndMap(
rowCountQuery,
rs -> {
if (!rs.next()) {
throw new SQLException(
String.format(
"No result returned after running query [%s]",
rowCountQuery));
}
return rs.getLong(1);
});
Boolean useSelectCount = oracleSourceConfig.getUseSelectCount();
String rowCountQuery;
if (useSelectCount) {
rowCountQuery = String.format("select count(*) from %s", quoteSchemaAndTable(tableId));
} else {
rowCountQuery =
String.format(
"select NUM_ROWS from all_tables where TABLE_NAME = '%s'",
tableId.table());
Boolean skipAnalyze = oracleSourceConfig.getSkipAnalyze();
if (!skipAnalyze) {
final String analyzeTable =
String.format(
"analyze table %s compute statistics for table",
quoteSchemaAndTable(tableId));
// not skip analyze
log.info("analyze table sql: {}", analyzeTable);
jdbc.execute(analyzeTable);
}
}
log.info("row count query: {}", rowCountQuery);
return jdbc.queryAndMap(
rowCountQuery,
rs -> {
if (!rs.next()) {
throw new SQLException(
String.format(
"No result returned after running query [%s]",
rowCountQuery));
}
return rs.getLong(1);
});
}

public static Object queryMin(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,16 @@ public interface JdbcSourceOptions {
+ "The value represents the denominator of the sampling rate fraction. "
+ "For example, a value of 1000 means a sampling rate of 1/1000. "
+ "This parameter is used when the sample sharding strategy is triggered.");

Option<Boolean> USE_SELECT_COUNT =
Options.key("use_select_count")
.booleanType()
.defaultValue(false)
.withDescription("Use select count for table count");

Option<Boolean> SKIP_ANALYZE =
Options.key("skip_analyze")
.booleanType()
.defaultValue(false)
.withDescription("Skip the analysis of table count");
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public class JdbcSourceTableConfig implements Serializable {
@JsonProperty("partition_upper_bound")
private BigDecimal partitionEnd;

@JsonProperty("use_select_count")
private Boolean useSelectCount;

@JsonProperty("skip_analyze")
private Boolean skipAnalyze;

@Tolerate
public JdbcSourceTableConfig() {}

Expand All @@ -79,6 +85,8 @@ public static List<JdbcSourceTableConfig> of(ReadonlyConfig connectorConfig) {
.partitionNumber(connectorConfig.get(JdbcOptions.PARTITION_NUM))
.partitionStart(connectorConfig.get(JdbcOptions.PARTITION_LOWER_BOUND))
.partitionEnd(connectorConfig.get(JdbcOptions.PARTITION_UPPER_BOUND))
.useSelectCount(connectorConfig.get(JdbcSourceOptions.USE_SELECT_COUNT))
.skipAnalyze(connectorConfig.get(JdbcSourceOptions.SKIP_ANALYZE))
.build();
tableList = Collections.singletonList(tableProperty);
}
Expand Down
Loading

0 comments on commit 0d08b20

Please sign in to comment.