Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mysql sink may be blocked by network io wait #825

Merged
merged 8 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,22 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) {
}
}

func (s *mysqlSink) notifyAndWaitExec() {
func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) {
s.notifier.Notify()
for _, w := range s.workers {
w.waitAllTxnsExecuted()
done := make(chan struct{})
go func() {
for _, w := range s.workers {
w.waitAllTxnsExecuted()
}
close(done)
}()
// This is a hack code to avoid io wait in some routine blocks others to exit.
// As the network io wait is blocked in kernel code, the goroutine is in a
// D-state that we could not even stop it by cancel the context. So if this
// scenario happens, the blocked goroutine will be leak.
select {
case <-ctx.Done():
case <-done:
}
}

Expand Down Expand Up @@ -463,7 +475,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model
sendFn(txn, idx)
return
}
s.notifyAndWaitExec()
s.notifyAndWaitExec(ctx)
causality.reset()
}
sendFn(txn, rowsChIdx)
Expand All @@ -477,7 +489,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model
s.metricConflictDetectDurationHis.Observe(time.Since(startTime).Seconds())
}
}
s.notifyAndWaitExec()
s.notifyAndWaitExec(ctx)
}

type mysqlSinkWorker struct {
Expand Down Expand Up @@ -609,6 +621,9 @@ func (s *mysqlSink) execDMLWithMaxRetries(
failpoint.Inject("MySQLSinkTxnRandomError", func() {
failpoint.Return(checkTxnErr(errors.Trace(dmysql.ErrInvalidConn)))
})
failpoint.Inject("MySQLSinkHangLongTime", func() {
time.Sleep(time.Hour)
})
err := s.statistics.RecordBatchExecution(func() (int, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -691,6 +706,9 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64,
}

func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, replicaID uint64, bucket int) error {
failpoint.Inject("MySQLSinkExecDMLError", func() {
failpoint.Return(errors.Trace(dmysql.ErrInvalidConn))
})
dmls, err := s.prepareDMLs(rows, replicaID, bucket)
if err != nil {
return errors.Trace(err)
Expand Down
27 changes: 27 additions & 0 deletions tests/sink_hang/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "sink_hang"
tables = ["~t.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
70 changes: 70 additions & 0 deletions tests/sink_hang/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

CDC_COUNT=3
DB_COUNT=4
MAX_RETRIES=10

function check_changefeed_state() {
pd_addr=$1
changefeed_id=$2
expected=$3
state=$(cdc cli --pd=$pd_addr changefeed query -s -c $changefeed_id|jq -r ".state")
if [[ "$state" != "$expected" ]];then
echo "unexpected state $state, expected $expected"
exit 1
fi
}

export -f check_changefeed_state

function run() {
# kafka is not supported yet.
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

pd_addr="http://$UP_PD_HOST:$UP_PD_PORT"
TOPIC_NAME="ticdc-sink-hang-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
*) SINK_URI="mysql://[email protected]:3306/?max-txn-row=1";;
esac
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi

export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkHangLongTime=1*return(true);github.com/pingcap/ticdc/cdc/sink/MySQLSinkExecDMLError=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

run_sql "CREATE DATABASE sink_hang;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table sink_hang.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table sink_hang.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "BEGIN; INSERT INTO sink_hang.t1 VALUES (),(),(); INSERT INTO sink_hang.t2 VALUES (),(),(); COMMIT" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped"
cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal"

check_table_exists "sink_hang.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "sink_hang.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"