diff --git a/deployments/ticdc/docker-compose/docker-compose-kafka-integration.yml b/deployments/ticdc/docker-compose/docker-compose-kafka-integration.yml index 2e794be09db..d237bf12bcf 100644 --- a/deployments/ticdc/docker-compose/docker-compose-kafka-integration.yml +++ b/deployments/ticdc/docker-compose/docker-compose-kafka-integration.yml @@ -11,8 +11,9 @@ services: ports: - "9092:9092" environment: - KAFKA_MESSAGE_MAX_BYTES: 1073741824 - KAFKA_REPLICA_FETCH_MAX_BYTES: 1073741824 + KAFKA_MESSAGE_MAX_BYTES: 11534336 + KAFKA_REPLICA_FETCH_MAX_BYTES: 11534336 + KAFKA_CREATE_TOPICS: "big-message-test:1:1" KAFKA_BROKER_ID: 1 RACK_COMMAND: "curl -sfL https://git.io/JJZXX -o /tmp/kafka.server.keystore.jks && curl -sfL https://git.io/JJZXM -o /tmp/kafka.server.truststore.jks" KAFKA_LISTENERS: "SSL://127.0.0.1:9093,PLAINTEXT://127.0.0.1:9092" diff --git a/tests/integration_tests/kafka_big_messages/conf/diff_config.toml b/tests/integration_tests/kafka_big_messages/conf/diff_config.toml new file mode 100644 index 00000000000..0082a370281 --- /dev/null +++ b/tests/integration_tests/kafka_big_messages/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/kafka_big_messages/sync_diff/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["kafka_big_messages.test"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/kafka_big_messages/run.sh b/tests/integration_tests/kafka_big_messages/run.sh new file mode 100755 index 00000000000..0b10c780c43 --- /dev/null +++ b/tests/integration_tests/kafka_big_messages/run.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +set -e + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + # test kafka sink only in this case + if [ "$SINK_TYPE" == "mysql" ]; then + return + fi + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + # Use a max-message-bytes parameter that is larger than the kafka topic max message bytes. + # Test if TiCDC automatically uses the max-message-bytes of the topic. + # See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L178 + # Use a topic that has already been created. + # See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L180 + SINK_URI="kafka://127.0.0.1:9092/big-message-test?protocol=open-protocol&partition-num=1&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912" + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/big-message-test?protocol=open-protocol&partition-num=1&version=${KAFKA_VERSION}" + + echo "Starting generate kafka big messages..." + cd $CUR/../../utils/gen_kafka_big_messages + if [ ! -f ./gen_kafka_big_messages ]; then + GO111MODULE=on go build + fi + # Generate data larger than kafka broker max.message.bytes. We can send this data correctly. + ./gen_kafka_big_messages --row-count=15 --sql-file-path=$CUR/test.sql + + run_sql_file $CUR/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + table="kafka_big_messages.test" + check_table_exists $table ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/kafka_messages/conf/diff_config.toml b/tests/integration_tests/kafka_messages/conf/diff_config.toml index f4a6d29c149..f471166e80c 100644 --- a/tests/integration_tests/kafka_messages/conf/diff_config.toml +++ b/tests/integration_tests/kafka_messages/conf/diff_config.toml @@ -7,7 +7,7 @@ export-fix-sql = true check-struct-only = false [task] - output-dir = "/tmp/tidb_cdc_test/kafka_message/sync_diff/output" + output-dir = "/tmp/tidb_cdc_test/kafka_messages/sync_diff/output" source-instances = ["mysql1"] diff --git a/tests/integration_tests/kafka_messages/run.sh b/tests/integration_tests/kafka_messages/run.sh index 1fb75b5adfa..5f89c04b421 100755 --- a/tests/integration_tests/kafka_messages/run.sh +++ b/tests/integration_tests/kafka_messages/run.sh @@ -31,10 +31,13 @@ function run_length_limit() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" - SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" + # Use a max-message-bytes parameter that is larger than the kafka broker max message bytes. + # Test if TiCDC automatically uses the max-message-bytes of the broker. + # See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L178 + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}" fi # Add a check table to reduce check time, or if we check data with sync diff diff --git a/tests/utils/gen_kafka_big_messages/main.go b/tests/utils/gen_kafka_big_messages/main.go new file mode 100644 index 00000000000..f3675f44743 --- /dev/null +++ b/tests/utils/gen_kafka_big_messages/main.go @@ -0,0 +1,145 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + "log" + "os" + "strings" + + "github.com/pingcap/errors" +) + +// See: https://docs.pingcap.com/tidb/stable/tidb-limitations/#limitations-on-string-types +const varcharColumnMaxLen = 16383 + +// Value of col. Defined as a variable for testing. +var colValue = strings.Repeat("a", varcharColumnMaxLen) + +type options struct { + // The size of each row. + // The default is 1MiB. + // FIXME: Currently it does not have precise control over the size of each row. + // The overhead needs to be calculated and processed accurately. + rowBytes int + // Total number of rows. + // The default is 1 line. + rowCount int + // Sql file path. + // The default is `./test.sql`. + sqlFilePath string + // Database name. + // The default is `kafka_big_messages`. + databaseName string + // Table name. + // The default is `test`. + tableName string +} + +func (o *options) validate() error { + if o.rowBytes <= 0 { + return errors.New("rowBytes must be greater than zero") + } + + if o.rowCount <= 0 { + return errors.New("rowCount must be greater than zero") + } + + if o.sqlFilePath == "" { + return errors.New("please specify the correct file path") + } + + if o.databaseName == "" { + return errors.New("please specify the database name") + } + + if o.tableName == "" { + return errors.New("please specify the table name") + } + + return nil +} + +func gatherOptions() *options { + o := &options{} + + fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError) + fs.IntVar(&o.rowBytes, "row-bytes", 1024*1024, "Number of bytes per row.") + fs.IntVar(&o.rowCount, "row-count", 1, "Count of rows.") + fs.StringVar(&o.sqlFilePath, "sql-file-path", "./test.sql", "Sql file path.") + fs.StringVar(&o.databaseName, "database-name", "kafka_big_messages", "Database name.") + fs.StringVar(&o.tableName, "table-name", "test", "Table name.") + + _ = fs.Parse(os.Args[1:]) + return o +} + +func main() { + o := gatherOptions() + if err := o.validate(); err != nil { + log.Panicf("Invalid options: %v", err) + } + + file, err := os.OpenFile(o.sqlFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm) + if err != nil { + log.Panicf("Open sql file failed: %v", err) + } + + _, err = file.Write([]byte(genDatabaseSql(o.databaseName))) + if err != nil { + log.Panicf("Wirte create database sql failed: %v", err) + } + + _, err = file.Write([]byte(genCreateTableSql(o.rowBytes, o.tableName))) + if err != nil { + log.Panicf("Wirte create table sql failed: %v", err) + } + + for i := 0; i < o.rowCount; i++ { + _, err = file.Write([]byte(genInsertSql(o.rowBytes, o.tableName, i))) + if err != nil { + log.Panicf("Wirte insert sql failed: %v", err) + } + } +} + +func genDatabaseSql(databaseName string) string { + return fmt.Sprintf(`DROP DATABASE IF EXISTS %s; +CREATE DATABASE %s; +USE %s; + +`, databaseName, databaseName, databaseName) +} + +func genCreateTableSql(rawBytes int, tableName string) string { + var cols string + + for i := 0; i < rawBytes/varcharColumnMaxLen; i++ { + cols = fmt.Sprintf("%s, a%d VARCHAR(%d)", cols, i, varcharColumnMaxLen) + } + + return fmt.Sprintf("CREATE TABLE %s(id int primary key %s);\n", tableName, cols) +} + +func genInsertSql(rawBytes int, tableName string, id int) string { + var values string + + for i := 0; i < rawBytes/varcharColumnMaxLen; i++ { + values = fmt.Sprintf("%s, '%s'", values, colValue) + } + + return fmt.Sprintf("INSERT INTO %s VALUES (%d%s);\n", tableName, id, values) +} diff --git a/tests/utils/gen_kafka_big_messages/main_test.go b/tests/utils/gen_kafka_big_messages/main_test.go new file mode 100644 index 00000000000..504829d8f7d --- /dev/null +++ b/tests/utils/gen_kafka_big_messages/main_test.go @@ -0,0 +1,121 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidateOptions(t *testing.T) { + testCases := []struct { + o *options + expectedErr string + }{ + { + &options{ + rowBytes: 0, + }, + ".*rowBytes must be greater than zero.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 0, + }, + ".*rowCount must be greater than zero.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 1, + sqlFilePath: "", + }, + ".*please specify the correct file path.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 1, + sqlFilePath: "./test.sql", + databaseName: "", + }, + ".*please specify the database name.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 1, + sqlFilePath: "./test.sql", + databaseName: "kafka-big-messages", + tableName: "", + }, + ".*please specify the table name.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 10, + sqlFilePath: "./test.sql", + databaseName: "kafka-big-messages", + tableName: "test", + }, + "", + }, + } + + for _, tc := range testCases { + err := tc.o.validate() + if tc.expectedErr != "" { + require.Error(t, err) + require.Regexp(t, tc.expectedErr, tc.o.validate().Error()) + } else { + require.Nil(t, err) + } + } +} + +func TestGenDatabaseSql(t *testing.T) { + database := "test" + + sql := genDatabaseSql(database) + + require.Equal(t, "DROP DATABASE IF EXISTS test;\nCREATE DATABASE test;\nUSE test;\n\n", sql) +} + +func TestGenCreateTableSql(t *testing.T) { + rawBytes := varcharColumnMaxLen + tableName := "test" + + sql := genCreateTableSql(rawBytes, tableName) + require.Equal(t, "CREATE TABLE test(id int primary key , a0 VARCHAR(16383));\n", sql) +} + +func TestGenInsertSql(t *testing.T) { + // Override the col value to test. + oldColValue := colValue + colValue = "a" + defer func() { + colValue = oldColValue + }() + + rawBytes := varcharColumnMaxLen + tableName := "test" + id := 1 + + sql := genInsertSql(rawBytes, tableName, id) + println(sql) + require.Equal(t, "INSERT INTO test VALUES (1, 'a');\n", sql) +}