From 8e8dee072f89357fbe236d20a0d01ca99b10fe5c Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Thu, 28 Apr 2022 16:25:56 +0800 Subject: [PATCH 1/2] [mysql] Generates multiple chunks when approximate row count is bigger than chunk size (#1191) --- .../base/experimental/MySqlChunkSplitter.java | 18 +++++++++++------ .../mysql/source/assigners/ChunkSplitter.java | 20 ++++++++++++------- .../MySqlSnapshotSplitAssignerTest.java | 13 ++++++++++++ 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlChunkSplitter.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlChunkSplitter.java index b07462382c1..1a281fbc81a 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlChunkSplitter.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlChunkSplitter.java @@ -186,7 +186,7 @@ private List splitTableIntoChunks( // the minimum dynamic chunk size is at least 1 final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, dynamicChunkSize); + tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { return splitUnevenlySizedChunks( jdbc, tableId, splitColumnName, min, max, chunkSize); @@ -201,12 +201,18 @@ private List splitTableIntoChunks( * and tumble chunks in step size. */ private List splitEvenlySizedChunks( - TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) { + TableId tableId, + Object min, + Object max, + long approximateRowCnt, + int chunkSize, + int dynamicChunkSize) { LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}", + "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", tableId, approximateRowCnt, - chunkSize); + chunkSize, + dynamicChunkSize); if (approximateRowCnt <= chunkSize) { // there is no more than one chunk, return full table as a chunk return Collections.singletonList(ChunkRange.all()); @@ -214,12 +220,12 @@ private List splitEvenlySizedChunks( final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, chunkSize); + Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); while (ObjectUtils.compare(chunkEnd, max) <= 0) { splits.add(ChunkRange.of(chunkStart, chunkEnd)); chunkStart = chunkEnd; try { - chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize); + chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); } catch (ArithmeticException e) { // Stop chunk split to avoid dead loop when number overflows. break; diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java index 444fd74a773..ed5de38f377 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java @@ -152,7 +152,7 @@ private List splitTableIntoChunks( // the minimum dynamic chunk size is at least 1 final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, dynamicChunkSize); + tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { return splitUnevenlySizedChunks( jdbc, tableId, splitColumnName, min, max, chunkSize); @@ -167,13 +167,19 @@ private List splitTableIntoChunks( * and tumble chunks in step size. */ @VisibleForTesting - public List splitEvenlySizedChunks( - TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) { + private List splitEvenlySizedChunks( + TableId tableId, + Object min, + Object max, + long approximateRowCnt, + int chunkSize, + int dynamicChunkSize) { LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}", + "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", tableId, approximateRowCnt, - chunkSize); + chunkSize, + dynamicChunkSize); if (approximateRowCnt <= chunkSize) { // there is no more than one chunk, return full table as a chunk return Collections.singletonList(ChunkRange.all()); @@ -181,12 +187,12 @@ public List splitEvenlySizedChunks( final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, chunkSize); + Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); while (ObjectUtils.compare(chunkEnd, max) <= 0) { splits.add(ChunkRange.of(chunkStart, chunkEnd)); chunkStart = chunkEnd; try { - chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize); + chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); } catch (ArithmeticException e) { // Stop chunk split to avoid dead loop when number overflows. break; diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 6f125897458..889adc019e5 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -199,6 +199,19 @@ public void testAssignTableWithSparseDistributionSplitKey() { customerDatabase.getDatabaseName() + ".customers_sparse_dist" }); assertEquals(expected1, splits1); + + // test sparse table that the approximate row count is bigger than chunk size + List expected2 = + Arrays.asList("customers_sparse_dist null [18]", "customers_sparse_dist [18] null"); + List splits2 = + getTestAssignSnapshotSplits( + 8, + 10d, + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] { + customerDatabase.getDatabaseName() + ".customers_sparse_dist" + }); + assertEquals(expected2, splits2); } @Test From 432b769f170c9cbc4c44a519b0a30594ca97bf0d Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Fri, 29 Jul 2022 16:30:24 +0800 Subject: [PATCH 2/2] fix test --- .../cdc/connectors/mysql/source/assigners/ChunkSplitter.java | 2 +- .../mysql/source/assigners/MySqlChunkSplitterTest.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java index ed5de38f377..7000de91646 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java @@ -167,7 +167,7 @@ private List splitTableIntoChunks( * and tumble chunks in step size. */ @VisibleForTesting - private List splitEvenlySizedChunks( + public List splitEvenlySizedChunks( TableId tableId, Object min, Object max, diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java index 7839fba28cd..f3681cbf386 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java @@ -35,6 +35,7 @@ public void testSplitEvenlySizedChunksOverflow() { Integer.MAX_VALUE - 19, Integer.MAX_VALUE, 20, + 10, 10); assertEquals(2, res.size()); assertEquals(ChunkRange.of(null, 2147483638), res.get(0)); @@ -50,6 +51,7 @@ public void testSplitEvenlySizedChunksNormal() { Integer.MAX_VALUE - 20, Integer.MAX_VALUE, 20, + 10, 10); assertEquals(3, res.size()); assertEquals(ChunkRange.of(null, 2147483637), res.get(0));