From 647bbd1265cb1ccb070ef4fbd16e6fdac7ba010c Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Mon, 24 Jul 2023 15:21:01 +0800 Subject: [PATCH 1/3] [WIP][TEST] Locate the problem of e2e instability --- .../cdc/mongodb/source/fetch/MongodbScanFetchTask.java | 1 + .../src/test/resources/log4j2.properties | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java index d59ecfe3646..79f06e905c2 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java @@ -81,6 +81,7 @@ public MongodbScanFetchTask(SnapshotSplit snapshotSplit) { @Override public void execute(Context context) throws Exception { + log.info("============================================="); MongodbFetchTaskContext taskContext = (MongodbFetchTaskContext) context; MongodbSourceConfig sourceConfig = taskContext.getSourceConfig(); MongodbDialect dialect = taskContext.getDialect(); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties index fc0c2063b7a..7b9977bd6ea 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties @@ -16,10 +16,10 @@ # limitations under the License. ################################################################################ -rootLogger.level = WARN +rootLogger.level = INFO logger.zeta.name=org.apache.seatunnel.engine -logger.zeta.level=WARN +logger.zeta.level=INFO logger.debezium.name=io.debezium.connector logger.debezium.level=WARN From db9d5ce9e831daa03f3613abfef01d86574b72c6 Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Mon, 24 Jul 2023 16:40:52 +0800 Subject: [PATCH 2/3] [Hotfix][Mongodb cdc] Solve the problem that mongodb cdc startup resume token is negative --- .../source/dialect/MongodbDialect.java | 7 +++ .../source/fetch/MongodbScanFetchTask.java | 1 - .../cdc/mongodb/utils/ResumeToken.java | 46 ++++++++----------- .../test/java/mongodb/MongoDBContainer.java | 1 + .../src/test/resources/log4j2-test.properties | 4 +- .../src/test/resources/log4j2.properties | 10 ++-- 6 files changed, 33 insertions(+), 36 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java index 11ef57ffc5f..25e463c17e5 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java @@ -34,6 +34,7 @@ import com.mongodb.client.MongoClient; import io.debezium.relational.TableId; +import lombok.extern.slf4j.Slf4j; import javax.annotation.Nonnull; @@ -52,6 +53,7 @@ import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getCurrentClusterTime; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getLatestResumeToken; +@Slf4j public class MongodbDialect implements DataSourceDialect { private final Map cache = @@ -137,6 +139,11 @@ public ChangeStreamOffset displayCurrentOffset(MongodbSourceConfig sourceConfig) ChangeStreamOffset changeStreamOffset; if (startupResumeToken != null) { changeStreamOffset = new ChangeStreamOffset(startupResumeToken); + log.info( + "startup resume token={},change stream offset={}", + startupResumeToken, + changeStreamOffset); + } else { changeStreamOffset = new ChangeStreamOffset(getCurrentClusterTime(mongoClient)); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java index 79f06e905c2..d59ecfe3646 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java @@ -81,7 +81,6 @@ public MongodbScanFetchTask(SnapshotSplit snapshotSplit) { @Override public void execute(Context context) throws Exception { - log.info("============================================="); MongodbFetchTaskContext taskContext = (MongodbFetchTaskContext) context; MongodbSourceConfig sourceConfig = taskContext.getSourceConfig(); MongodbDialect dialect = taskContext.getDialect(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java index 3ddd2ccbb21..5ee8962bc53 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils; -import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException; - import org.bson.BsonDocument; import org.bson.BsonTimestamp; import org.bson.BsonValue; @@ -29,41 +27,33 @@ import java.nio.ByteOrder; import java.util.Objects; -import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT; - public class ResumeToken { private static final int K_TIMESTAMP = 130; - public static @Nonnull BsonTimestamp decodeTimestamp(BsonDocument resumeToken) { - Objects.requireNonNull(resumeToken, "Missing ResumeToken."); - BsonValue bsonValue = resumeToken.get("_data"); - byte[] keyStringBytes = extractKeyStringBytes(bsonValue); - validateKeyType(keyStringBytes); - - ByteBuffer buffer = ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN); - int t = buffer.getInt(); - int i = buffer.getInt(); - return new BsonTimestamp(t, i); - } - - private static byte[] extractKeyStringBytes(@Nonnull BsonValue bsonValue) { - if (bsonValue.isBinary()) { - return bsonValue.asBinary().getData(); - } else if (bsonValue.isString()) { - return hexToUint8Array(bsonValue.asString().getValue()); + public static BsonTimestamp decodeTimestamp(BsonDocument resumeToken) { + BsonValue bsonValue = + Objects.requireNonNull(resumeToken, "Missing ResumeToken.").get("_data"); + final byte[] keyStringBytes; + // Resume Tokens format: https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens + if (bsonValue.isBinary()) { // BinData + keyStringBytes = bsonValue.asBinary().getData(); + } else if (bsonValue.isString()) { // Hex-encoded string (v0 or v1) + keyStringBytes = hexToUint8Array(bsonValue.asString().getValue()); } else { - throw new MongodbConnectorException( - ILLEGAL_ARGUMENT, "Unknown resume token format: " + bsonValue); + throw new IllegalArgumentException( + "Unknown resume token format: " + resumeToken.toJson()); } - } - private static void validateKeyType(byte[] keyStringBytes) { - int kType = keyStringBytes[0] & 0xff; + ByteBuffer buffer = ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN); + int kType = buffer.get() & 0xff; if (kType != K_TIMESTAMP) { - throw new MongodbConnectorException( - ILLEGAL_ARGUMENT, "Unknown keyType of timestamp: " + kType); + throw new IllegalArgumentException("Unknown keyType of timestamp: " + kType); } + + int t = buffer.getInt(); + int i = buffer.getInt(); + return new BsonTimestamp(t, i); } private static byte[] hexToUint8Array(@Nonnull String str) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java index c33f6d047d4..a311bccc90b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java @@ -84,6 +84,7 @@ public MongoDBContainer(Network network, ShardingClusterRole clusterRole) { withExposedPorts(MONGODB_PORT); withCommand(ShardingClusterRole.startupCommand(clusterRole)); waitingFor(clusterRole.waitStrategy); + withEnv("TZ", "Asia/Shanghai"); } public void executeCommand(String command) { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties index f0090af0248..fe85e7bdb06 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties @@ -21,8 +21,8 @@ rootLogger.level = WARN rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender -logger.zeta.name=org.apache.seatunnel.engine -logger.zeta.level=WARN +logger.zeta.name = org.apache.seatunnel.engine +logger.zeta.level = WARN appender.consoleStdout.name = consoleStdoutAppender appender.consoleStdout.type = CONSOLE diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties index 7b9977bd6ea..3d028135e24 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties @@ -16,13 +16,13 @@ # limitations under the License. ################################################################################ -rootLogger.level = INFO +rootLogger.level = WARN -logger.zeta.name=org.apache.seatunnel.engine -logger.zeta.level=INFO +logger.zeta.name = org.apache.seatunnel.engine +logger.zeta.level = WARN -logger.debezium.name=io.debezium.connector -logger.debezium.level=WARN +logger.debezium.name = io.debezium.connector +logger.debezium.level = WARN ############################ log output to console ############################# rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender From acce0cc1edcdc9e1dfe7b9bb30e981df155c3142 Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Mon, 24 Jul 2023 22:59:54 +0800 Subject: [PATCH 3/3] fix --- .../src/test/resources/log4j2-test.properties | 4 ++-- .../src/test/resources/log4j2.properties | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties index fe85e7bdb06..f0090af0248 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties @@ -21,8 +21,8 @@ rootLogger.level = WARN rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender -logger.zeta.name = org.apache.seatunnel.engine -logger.zeta.level = WARN +logger.zeta.name=org.apache.seatunnel.engine +logger.zeta.level=WARN appender.consoleStdout.name = consoleStdoutAppender appender.consoleStdout.type = CONSOLE diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties index 3d028135e24..fc0c2063b7a 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties @@ -18,11 +18,11 @@ rootLogger.level = WARN -logger.zeta.name = org.apache.seatunnel.engine -logger.zeta.level = WARN +logger.zeta.name=org.apache.seatunnel.engine +logger.zeta.level=WARN -logger.debezium.name = io.debezium.connector -logger.debezium.level = WARN +logger.debezium.name=io.debezium.connector +logger.debezium.level=WARN ############################ log output to console ############################# rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender