From 65f6e965f5f7bb6cb93d3931bacb3b6b90af507c Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 5 Aug 2020 19:02:20 +0800 Subject: [PATCH 1/7] Fix mysql sink may be blocked by network io wait --- cdc/sink/mysql.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index e7978a1e5a1..c2653c9a67a 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -420,10 +420,22 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) { } } -func (s *mysqlSink) notifyAndWaitExec() { +func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) { s.notifier.Notify() - for _, w := range s.workers { - w.waitAllTxnsExecuted() + done := make(chan struct{}, 1) + go func() { + for _, w := range s.workers { + w.waitAllTxnsExecuted() + } + done <- struct{}{} + }() + // This is a hack code to avoid io wait in some routine blocks others to exit. + // As the network io wait is blocked in kernel code, the goroutine is in a + // D-state that we could not even stop it by cancel the context. So if this + // scenario happens, the blocked goroutine will be leak. + select { + case <-ctx.Done(): + case <-done: } } @@ -463,7 +475,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model sendFn(txn, idx) return } - s.notifyAndWaitExec() + s.notifyAndWaitExec(ctx) causality.reset() } sendFn(txn, rowsChIdx) @@ -477,7 +489,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model s.metricConflictDetectDurationHis.Observe(time.Since(startTime).Seconds()) } } - s.notifyAndWaitExec() + s.notifyAndWaitExec(ctx) } type mysqlSinkWorker struct { From d9ef8186961909374273d10da0c2e54065ab3680 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 5 Aug 2020 19:02:20 +0800 Subject: [PATCH 2/7] add integration test --- cdc/sink/mysql.go | 3 ++ tests/sink_hang/conf/diff_config.toml | 27 ++++++++++++ tests/sink_hang/run.sh | 63 +++++++++++++++++++++++++++ 3 files changed, 93 insertions(+) create mode 100644 tests/sink_hang/conf/diff_config.toml create mode 100644 tests/sink_hang/run.sh diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index c2653c9a67a..226e0558862 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -621,6 +621,9 @@ func (s *mysqlSink) execDMLWithMaxRetries( failpoint.Inject("MySQLSinkTxnRandomError", func() { failpoint.Return(checkTxnErr(errors.Trace(dmysql.ErrInvalidConn))) }) + failpoint.Inject("MySQLSinkHangLongTime", func() { + time.Sleep(time.Hour) + }) err := s.statistics.RecordBatchExecution(func() (int, error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { diff --git a/tests/sink_hang/conf/diff_config.toml b/tests/sink_hang/conf/diff_config.toml new file mode 100644 index 00000000000..7b2de059d24 --- /dev/null +++ b/tests/sink_hang/conf/diff_config.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-rowid = false +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "sink_hang" + tables = ["usertable"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/sink_hang/run.sh b/tests/sink_hang/run.sh new file mode 100644 index 00000000000..0fe23c12ba7 --- /dev/null +++ b/tests/sink_hang/run.sh @@ -0,0 +1,63 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 +MAX_RETRIES=10 + +function check_changefeed_state() { + pd_addr=$1 + changefeed_id=$2 + expected=$3 + state=$(cdc cli --pd=$pd_addr changefeed query -s -c $changefeed_id|jq -r ".state") + if [[ "$state" != "$expected" ]];then + echo "unexpected state $state, expected $expected" + exit 1 + fi +} + +export -f check_changefeed_state + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + + pd_addr="http://$UP_PD_HOST:$UP_PD_PORT" + TOPIC_NAME="ticdc-sink-hang-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + *) SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1";; + esac + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi + + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkHangLongTime=1*return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr + changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') + + run_sql "CREATE DATABASE sink_hang;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE table sink_hang.simple(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO sink_hang.simple VALUES (),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "failed" + cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr + ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal" + + check_table_exists "sink_hang.simple" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 0321d2bf46d2c77ad1a2d8dbde07de3765ea180b Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 5 Aug 2020 19:39:08 +0800 Subject: [PATCH 3/7] fix integration test --- tests/sink_hang/conf/diff_config.toml | 2 +- tests/sink_hang/run.sh | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/sink_hang/conf/diff_config.toml b/tests/sink_hang/conf/diff_config.toml index 7b2de059d24..63d823e7fe1 100644 --- a/tests/sink_hang/conf/diff_config.toml +++ b/tests/sink_hang/conf/diff_config.toml @@ -11,7 +11,7 @@ fix-sql-file = "fix.sql" # tables need to check. [[check-tables]] schema = "sink_hang" - tables = ["usertable"] + tables = ["~t.*"] [[source-db]] host = "127.0.0.1" diff --git a/tests/sink_hang/run.sh b/tests/sink_hang/run.sh index 0fe23c12ba7..6ea37f00022 100644 --- a/tests/sink_hang/run.sh +++ b/tests/sink_hang/run.sh @@ -40,19 +40,21 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" fi - export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkHangLongTime=1*return(true)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkHangLongTime=1*return(true);github.com/pingcap/ticdc/cdc/sink/MySQLSinkTxnRandomError=1*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') run_sql "CREATE DATABASE sink_hang;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "CREATE table sink_hang.simple(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "INSERT INTO sink_hang.simple VALUES (),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE table sink_hang.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE table sink_hang.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO sink_hang.t1.VALUES (),(),();INSERT INTO sink_hang.t2.VALUES (),(),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "failed" cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal" - check_table_exists "sink_hang.simple" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "sink_hang.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "sink_hang.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml cleanup_process $CDC_BINARY From 699166027570f48fdd12ad54d48b8d21baa49738 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 5 Aug 2020 19:46:31 +0800 Subject: [PATCH 4/7] fix integration test --- cdc/sink/mysql.go | 3 +++ tests/sink_hang/run.sh | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 226e0558862..2632efd4fda 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -706,6 +706,9 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64, } func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, replicaID uint64, bucket int) error { + failpoint.Inject("MySQLSinkExecDMLError", func() { + failpoint.Return(errors.Trace(dmysql.ErrInvalidConn)) + }) dmls, err := s.prepareDMLs(rows, replicaID, bucket) if err != nil { return errors.Trace(err) diff --git a/tests/sink_hang/run.sh b/tests/sink_hang/run.sh index 6ea37f00022..4d102f98fe0 100644 --- a/tests/sink_hang/run.sh +++ b/tests/sink_hang/run.sh @@ -40,14 +40,14 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" fi - export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkHangLongTime=1*return(true);github.com/pingcap/ticdc/cdc/sink/MySQLSinkTxnRandomError=1*return(true)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkHangLongTime=1*return(true);github.com/pingcap/ticdc/cdc/sink/MySQLSinkExecDMLError=1*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') run_sql "CREATE DATABASE sink_hang;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table sink_hang.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table sink_hang.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "INSERT INTO sink_hang.t1.VALUES (),(),();INSERT INTO sink_hang.t2.VALUES (),(),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "BEGIN; INSERT INTO sink_hang.t1 VALUES (),(),();INSERT INTO sink_hang.t2 VALUES (),(),();COMMIT" ${UP_TIDB_HOST} ${UP_TIDB_PORT} ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "failed" cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr From 05141b5eaeda296737259bd4c03550a7a2840eec Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 5 Aug 2020 19:55:09 +0800 Subject: [PATCH 5/7] fix integration test --- tests/sink_hang/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sink_hang/run.sh b/tests/sink_hang/run.sh index 4d102f98fe0..9fc543bb3ae 100644 --- a/tests/sink_hang/run.sh +++ b/tests/sink_hang/run.sh @@ -47,9 +47,9 @@ function run() { run_sql "CREATE DATABASE sink_hang;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table sink_hang.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table sink_hang.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "BEGIN; INSERT INTO sink_hang.t1 VALUES (),(),();INSERT INTO sink_hang.t2 VALUES (),(),();COMMIT" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "BEGIN; INSERT INTO sink_hang.t1 VALUES (),(),(); INSERT INTO sink_hang.t2 VALUES (),(),(); COMMIT" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "failed" + ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped" cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal" From be3fac4074e0c3a7fb331a979821c73013254ba3 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 6 Aug 2020 10:54:07 +0800 Subject: [PATCH 6/7] address comment --- cdc/sink/mysql.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 2632efd4fda..d39f0b8b0ea 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -422,12 +422,12 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) { func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) { s.notifier.Notify() - done := make(chan struct{}, 1) + done := make(chan struct{}) go func() { for _, w := range s.workers { w.waitAllTxnsExecuted() } - done <- struct{}{} + close(done) }() // This is a hack code to avoid io wait in some routine blocks others to exit. // As the network io wait is blocked in kernel code, the goroutine is in a From d69122b83ede59655eaf28cec9d4eca0f1e5133b Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 6 Aug 2020 12:26:52 +0800 Subject: [PATCH 7/7] skip kafka in sink_hang test --- tests/sink_hang/run.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/sink_hang/run.sh b/tests/sink_hang/run.sh index 9fc543bb3ae..bdfb7e6a3a0 100644 --- a/tests/sink_hang/run.sh +++ b/tests/sink_hang/run.sh @@ -26,6 +26,11 @@ function check_changefeed_state() { export -f check_changefeed_state function run() { + # kafka is not supported yet. + if [ "$SINK_TYPE" == "kafka" ]; then + return + fi + rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR cd $WORK_DIR