Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests(ticdc): add tests for kafka max-message-bytes #4125

Merged
merged 5 commits into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ services:
ports:
- "9092:9092"
environment:
KAFKA_MESSAGE_MAX_BYTES: 1073741824
KAFKA_REPLICA_FETCH_MAX_BYTES: 1073741824
KAFKA_MESSAGE_MAX_BYTES: 11534336
KAFKA_REPLICA_FETCH_MAX_BYTES: 11534336
KAFKA_CREATE_TOPICS: "big-message-test:1:1"
KAFKA_BROKER_ID: 1
RACK_COMMAND: "curl -sfL https://git.io/JJZXX -o /tmp/kafka.server.keystore.jks && curl -sfL https://git.io/JJZXM -o /tmp/kafka.server.truststore.jks"
KAFKA_LISTENERS: "SSL://127.0.0.1:9093,PLAINTEXT://127.0.0.1:9092"
Expand Down
29 changes: 29 additions & 0 deletions tests/integration_tests/kafka_big_messages/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/kafka_big_messages/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["kafka_big_messages.test"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
55 changes: 55 additions & 0 deletions tests/integration_tests/kafka_big_messages/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/bin/bash

set -e

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# test kafka sink only in this case
if [ "$SINK_TYPE" == "mysql" ]; then
return
fi
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

# Use a max-message-bytes parameter that is larger than the kafka topic max message bytes.
# Test if TiCDC automatically uses the max-message-bytes of the topic.
# See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L178
# Use a topic that has already been created.
# See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L180
SINK_URI="kafka://127.0.0.1:9092/big-message-test?protocol=open-protocol&partition-num=1&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912"
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/big-message-test?protocol=open-protocol&partition-num=1&version=${KAFKA_VERSION}"

echo "Starting generate kafka big messages..."
cd $CUR/../../utils/gen_kafka_big_messages
if [ ! -f ./gen_kafka_big_messages ]; then
GO111MODULE=on go build
fi
# Generate data larger than kafka broker max.message.bytes. We can send this data correctly.
./gen_kafka_big_messages --row-count=15 --sql-file-path=$CUR/test.sql

run_sql_file $CUR/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
table="kafka_big_messages.test"
check_table_exists $table ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export-fix-sql = true
check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/kafka_message/sync_diff/output"
output-dir = "/tmp/tidb_cdc_test/kafka_messages/sync_diff/output"

source-instances = ["mysql1"]

Expand Down
7 changes: 5 additions & 2 deletions tests/integration_tests/kafka_messages/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ function run_length_limit() {
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info"

TOPIC_NAME="ticdc-kafka-message-test-$RANDOM"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760"
# Use a max-message-bytes parameter that is larger than the kafka broker max message bytes.
# Test if TiCDC automatically uses the max-message-bytes of the broker.
# See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L178
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}"
fi

# Add a check table to reduce check time, or if we check data with sync diff
Expand Down
145 changes: 145 additions & 0 deletions tests/utils/gen_kafka_big_messages/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"
"fmt"
"log"
"os"
"strings"

"github.com/pingcap/errors"
)

// See: https://docs.pingcap.com/tidb/stable/tidb-limitations/#limitations-on-string-types
const varcharColumnMaxLen = 16383

// Value of col. Defined as a variable for testing.
var colValue = strings.Repeat("a", varcharColumnMaxLen)

type options struct {
// The size of each row.
// The default is 1MiB.
// FIXME: Currently it does not have precise control over the size of each row.
// The overhead needs to be calculated and processed accurately.
rowBytes int
// Total number of rows.
// The default is 1 line.
rowCount int
// Sql file path.
// The default is `./test.sql`.
sqlFilePath string
// Database name.
// The default is `kafka_big_messages`.
databaseName string
// Table name.
// The default is `test`.
tableName string
}

func (o *options) validate() error {
if o.rowBytes <= 0 {
return errors.New("rowBytes must be greater than zero")
}

if o.rowCount <= 0 {
return errors.New("rowCount must be greater than zero")
}

if o.sqlFilePath == "" {
return errors.New("please specify the correct file path")
}

if o.databaseName == "" {
return errors.New("please specify the database name")
}

if o.tableName == "" {
return errors.New("please specify the table name")
}

return nil
}

func gatherOptions() *options {
o := &options{}

fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
fs.IntVar(&o.rowBytes, "row-bytes", 1024*1024, "Number of bytes per row.")
fs.IntVar(&o.rowCount, "row-count", 1, "Count of rows.")
fs.StringVar(&o.sqlFilePath, "sql-file-path", "./test.sql", "Sql file path.")
fs.StringVar(&o.databaseName, "database-name", "kafka_big_messages", "Database name.")
fs.StringVar(&o.tableName, "table-name", "test", "Table name.")

_ = fs.Parse(os.Args[1:])
return o
}

func main() {
o := gatherOptions()
if err := o.validate(); err != nil {
log.Panicf("Invalid options: %v", err)
}

file, err := os.OpenFile(o.sqlFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
log.Panicf("Open sql file failed: %v", err)
}

_, err = file.Write([]byte(genDatabaseSql(o.databaseName)))
if err != nil {
log.Panicf("Wirte create database sql failed: %v", err)
}

_, err = file.Write([]byte(genCreateTableSql(o.rowBytes, o.tableName)))
if err != nil {
log.Panicf("Wirte create table sql failed: %v", err)
}

for i := 0; i < o.rowCount; i++ {
_, err = file.Write([]byte(genInsertSql(o.rowBytes, o.tableName, i)))
if err != nil {
log.Panicf("Wirte insert sql failed: %v", err)
}
}
}

func genDatabaseSql(databaseName string) string {
return fmt.Sprintf(`DROP DATABASE IF EXISTS %s;
CREATE DATABASE %s;
USE %s;
`, databaseName, databaseName, databaseName)
}

func genCreateTableSql(rawBytes int, tableName string) string {
var cols string

for i := 0; i < rawBytes/varcharColumnMaxLen; i++ {
cols = fmt.Sprintf("%s, a%d VARCHAR(%d)", cols, i, varcharColumnMaxLen)
}

return fmt.Sprintf("CREATE TABLE %s(id int primary key %s);\n", tableName, cols)
}

func genInsertSql(rawBytes int, tableName string, id int) string {
var values string

for i := 0; i < rawBytes/varcharColumnMaxLen; i++ {
values = fmt.Sprintf("%s, '%s'", values, colValue)
}

return fmt.Sprintf("INSERT INTO %s VALUES (%d%s);\n", tableName, id, values)
}
121 changes: 121 additions & 0 deletions tests/utils/gen_kafka_big_messages/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidateOptions(t *testing.T) {
testCases := []struct {
o *options
expectedErr string
}{
{
&options{
rowBytes: 0,
},
".*rowBytes must be greater than zero.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 0,
},
".*rowCount must be greater than zero.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 1,
sqlFilePath: "",
},
".*please specify the correct file path.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 1,
sqlFilePath: "./test.sql",
databaseName: "",
},
".*please specify the database name.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 1,
sqlFilePath: "./test.sql",
databaseName: "kafka-big-messages",
tableName: "",
},
".*please specify the table name.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 10,
sqlFilePath: "./test.sql",
databaseName: "kafka-big-messages",
tableName: "test",
},
"",
},
}

for _, tc := range testCases {
err := tc.o.validate()
if tc.expectedErr != "" {
require.Error(t, err)
require.Regexp(t, tc.expectedErr, tc.o.validate().Error())
} else {
require.Nil(t, err)
}
}
}

func TestGenDatabaseSql(t *testing.T) {
database := "test"

sql := genDatabaseSql(database)

require.Equal(t, "DROP DATABASE IF EXISTS test;\nCREATE DATABASE test;\nUSE test;\n\n", sql)
}

func TestGenCreateTableSql(t *testing.T) {
rawBytes := varcharColumnMaxLen
tableName := "test"

sql := genCreateTableSql(rawBytes, tableName)
require.Equal(t, "CREATE TABLE test(id int primary key , a0 VARCHAR(16383));\n", sql)
}

func TestGenInsertSql(t *testing.T) {
// Override the col value to test.
oldColValue := colValue
colValue = "a"
defer func() {
colValue = oldColValue
}()

rawBytes := varcharColumnMaxLen
tableName := "test"
id := 1

sql := genInsertSql(rawBytes, tableName, id)
println(sql)
require.Equal(t, "INSERT INTO test VALUES (1, 'a');\n", sql)
}