Skip to content

Commit

Permalink
[fix](cdc) configure table buckets for a single sink (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 authored Jan 31, 2024
1 parent 328f3c6 commit 007d524
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.EventType;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
Expand Down Expand Up @@ -246,9 +247,34 @@ public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessi
Preconditions.checkArgument(split.length == 2);
tableSchema.setDatabase(split[0]);
tableSchema.setTable(split[1]);
if (tableProperties.containsKey("table-buckets")) {
String tableBucketsConfig = tableProperties.get("table-buckets");
Map<String, Integer> tableBuckets = DatabaseSync.getTableBuckets(tableBucketsConfig);
Integer buckets = getTableSchemaBuckets(tableBuckets, tableSchema.getTable());
tableSchema.setTableBuckets(buckets);
}
return tableSchema;
}

@VisibleForTesting
public Integer getTableSchemaBuckets(Map<String, Integer> tableBucketsMap, String tableName) {
if (tableBucketsMap != null) {
// Firstly, if the table name is in the table-buckets map, set the buckets of the table.
if (tableBucketsMap.containsKey(tableName)) {
return tableBucketsMap.get(tableName);
}
// Secondly, iterate over the map to find a corresponding regular expression match,
for (Map.Entry<String, Integer> entry : tableBucketsMap.entrySet()) {

Pattern pattern = Pattern.compile(entry.getKey());
if (pattern.matcher(tableName).matches()) {
return entry.getValue();
}
}
}
return null;
}

private List<String> buildDistributeKeys(
List<String> primaryKeys, Map<String, FieldSchema> fields) {
if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ protected HashMap<Pattern, String> multiToOneRulesParser(
* @param tableBuckets the string of tableBuckets, eg:student:10,student_info:20,student.*:30
* @return The table name and buckets map. The key is table name, the value is buckets.
*/
public Map<String, Integer> getTableBuckets(String tableBuckets) {
public static Map<String, Integer> getTableBuckets(String tableBuckets) {
Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
String[] tableBucketsArray = tableBuckets.split(",");
for (String tableBucket : tableBucketsArray) {
Expand Down

0 comments on commit 007d524

Please sign in to comment.