Skip to content

Commit

Permalink
revert the CI test for the new owner and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored and ti-chi-bot committed Jun 22, 2021
1 parent be17909 commit cafc722
Show file tree
Hide file tree
Showing 19 changed files with 98 additions and 78 deletions.
14 changes: 14 additions & 0 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,20 @@ func (s *Server) handleChangefeedQuery(w http.ResponseWriter, req *http.Request)
tm := oracle.GetTimeFromTS(cfStatus.CheckpointTs)
resp.Checkpoint = tm.Format("2006-01-02 15:04:05.000")
}
if !config.NewReplicaImpl && cfStatus != nil {
switch cfStatus.AdminJobType {
case model.AdminNone, model.AdminResume:
if cfInfo != nil && cfInfo.Error != nil {
resp.FeedState = string(model.StateFailed)
}
case model.AdminStop:
resp.FeedState = string(model.StateStopped)
case model.AdminRemove:
resp.FeedState = string(model.StateRemoved)
case model.AdminFinish:
resp.FeedState = string(model.StateFinished)
}
}
writeData(w, resp)
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (
// ErrorHistoryThreshold represents failure upper limit in time window.
// Before a changefeed is initialized, check the the failure count of this
// changefeed, if it is less than ErrorHistoryThreshold, then initialize it.
ErrorHistoryThreshold = 3
ErrorHistoryThreshold = 5
)

// ChangeFeedInfo describes the detail of a ChangeFeed
Expand Down
11 changes: 5 additions & 6 deletions tests/availability/owner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ function test_owner_ha() {
test_hang_up_owner
test_expire_owner
test_owner_cleanup_stale_tasks
# FIXME: this test case should be owner crashed during task cleanup
# test_owner_cleanup_stale_tasks
test_owner_retryable_error
test_gap_between_watch_capture
}
Expand Down Expand Up @@ -159,8 +161,7 @@ function test_owner_cleanup_stale_tasks() {
function test_owner_retryable_error() {
echo "run test case test_owner_retryable_error"

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/capture-campaign-compacted-error=1*return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/capture/capture-campaign-compacted-error=1*return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/capture-campaign-compacted-error=1*return(true)'

# start a capture server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server1
Expand All @@ -172,8 +173,7 @@ function test_owner_retryable_error() {
echo "owner pid:" $owner_pid
echo "owner id" $owner_id

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner-run-with-error=1*return(true);github.com/pingcap/ticdc/cdc/capture-resign-failed=1*return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/owner-run-with-error=1*return(true);github.com/pingcap/ticdc/cdc/capture/capture-resign-failed=1*return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner-run-with-error=1*return(true);github.com/pingcap/ticdc/cdc/capture-resign-failed=1*return(true)'

# run another server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server2 --addr "127.0.0.1:8301"
Expand All @@ -198,8 +198,7 @@ function test_owner_retryable_error() {
function test_gap_between_watch_capture() {
echo "run test case test_gap_between_watch_capture"

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sleep-before-watch-capture=1*sleep(6000)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/sleep-in-owner-tick=1*sleep(6000)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sleep-before-watch-capture=1*sleep(6000)'

# start a capture server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_gap_between_watch_capture.server1
Expand Down
2 changes: 1 addition & 1 deletion tests/availability/processor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function test_processor_ha() {
function test_stop_processor() {
echo "run test case test_stop_processor"
# start a capture server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_stop_processor
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
# ensure the server become the owner
ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'"
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')
Expand Down
18 changes: 6 additions & 12 deletions tests/changefeed_auto_stop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,10 @@ function check_changefeed_state() {
endpoints=$1
changefeedid=$2
expected=$3
error_msg=$4
info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s)
echo "$info"
state=$(echo $info|jq -r '.state')
if [[ ! "$state" == "$expected" ]]; then
echo "changefeed state $state does not equal to $expected"
exit 1
fi
message=$(echo $info|jq -r '.error.message')
if [[ ! "$message" =~ "$error_msg" ]]; then
echo "error message '$message' is not as expected '$error_msg'"
output=$(cdc cli changefeed query --simple --changefeed-id $changefeedid --pd=http://$endpoints 2>&1)
state=$(echo $output | grep -oE "\"state\": \"[a-z]+\""|tr -d '" '|awk -F':' '{print $(NF)}')
if [ "$state" != "$expected" ]; then
echo "unexpected state $output, expected $expected"
exit 1
fi
}
Expand Down Expand Up @@ -59,8 +52,9 @@ function run() {
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
fi

ensure 20 check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ${changefeedid} "normal" "processor sync resolved injected error"
ensure 20 check_changefeed_state ${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "stopped"

cdc cli changefeed resume --changefeed-id=${changefeedid} --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}"
for i in $(seq $DB_COUNT); do
check_table_exists "changefeed_auto_stop_$i.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
done
Expand Down
32 changes: 15 additions & 17 deletions tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
MAX_RETRIES=20
MAX_RETRIES=10

function check_changefeed_mark_error() {
function check_changefeed_mark_failed() {
endpoints=$1
changefeedid=$2
error_msg=$3
info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s)
echo "$info"
state=$(echo $info|jq -r '.state')
if [[ ! "$state" == "error" ]]; then
echo "changefeed state $state does not equal to error"
if [[ ! "$state" == "failed" ]]; then
echo "changefeed state $state does not equal to failed"
exit 1
fi
message=$(echo $info|jq -r '.error.message')
Expand Down Expand Up @@ -97,7 +97,7 @@ function check_no_capture() {
fi
}

export -f check_changefeed_mark_error
export -f check_changefeed_mark_failed
export -f check_changefeed_mark_failed_regex
export -f check_changefeed_mark_stopped_regex
export -f check_changefeed_mark_stopped
Expand All @@ -114,8 +114,7 @@ function run() {
start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedNoRetryError=1*return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/NewChangefeedNoRetryError=1*return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedNoRetryError=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

Expand All @@ -130,20 +129,21 @@ function run() {
fi

ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*"
cdc cli changefeed resume -c $changefeedid
changefeed_info=$(ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST_1}:${UP_PD_PORT_1} get /tidb/cdc/changefeed/info/${changefeedid}|tail -n 1)
new_info=$(echo $changefeed_info|sed 's/"state":"failed"/"state":"normal"/g')
ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST_1}:${UP_PD_PORT_1} put /tidb/cdc/changefeed/info/${changefeedid} "$new_info"

check_table_exists "changefeed_error.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedRetryError=return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/NewChangefeedRetryError=return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedRetryError=return(true)'
kill $capture_pid
ensure $MAX_RETRIES check_no_capture http://${UP_PD_HOST_1}:${UP_PD_PORT_1}
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
ensure $MAX_RETRIES check_changefeed_mark_error http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failpoint injected retriable error"
ensure $MAX_RETRIES check_changefeed_mark_failed http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failpoint injected retriable error"

cdc cli changefeed remove -c $changefeedid
ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1}
Expand All @@ -152,24 +152,22 @@ function run() {
cleanup_process $CDC_BINARY

# owner DDL error case
# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectChangefeedDDLError=return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/InjectChangefeedDDLError=return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectChangefeedDDLError=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
changefeedid_1=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

run_sql "CREATE table changefeed_error.DDLERROR(id int primary key, val int);"
ensure $MAX_RETRIES check_changefeed_mark_error http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_1} "[CDC:ErrExecDDLFailed]exec DDL failed"
ensure $MAX_RETRIES check_changefeed_mark_stopped http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_1} "[CDC:ErrExecDDLFailed]exec DDL failed"

cdc cli changefeed remove -c $changefeedid_1
cleanup_process $CDC_BINARY

# updating GC safepoint failure case
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/InjectActualGCSafePoint=return(9223372036854775807)' # new owner
# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectActualGCSafePoint=return(9223372036854775807)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectActualGCSafePoint=return(9223372036854775807)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

changefeedid_2=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "[CDC:ErrStartTsBeforeGC]"
ensure $MAX_RETRIES check_changefeed_mark_stopped_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "service safepoint lost"

cdc cli changefeed remove -c $changefeedid_2
export GO_FAILPOINTS=''
Expand Down
11 changes: 7 additions & 4 deletions tests/changefeed_finish/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ MAX_RETRIES=10
function check_changefeed_is_finished() {
pd=$1
changefeed=$2
query=$(cdc cli changefeed query -c=$changefeed)
echo "$query"
state=$(echo "$query"|jq ".state"|tr -d '"')
state=$(cdc cli changefeed query -s -c=$changefeed|jq ".state"|tr -d '"')
if [[ ! "$state" -eq "finished" ]]; then
echo "state $state is not finished"
exit 1
fi

info=$(cdc cli changefeed query -c=$changefeed|sed "/has been deleted/d"|jq ".info")
if [[ ! "$info" -eq "null" ]]; then
echo "unexpected changefeed info $info, should be null"
exit 1
fi

status_length=$(echo "$query"|sed "/has been deleted/d"|jq '."task-status"|length')
status_length=$(cdc cli changefeed query -c=$changefeed|sed "/has been deleted/d"|jq '."task-status"|length')
if [[ ! "$status_length" -eq "0" ]]; then
echo "unexpected task status length $status_length, should be 0"
exit 1
Expand Down
23 changes: 20 additions & 3 deletions tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,33 @@ EOF
# Resume changefeed
run_cdc_cli changefeed --changefeed-id $uuid resume && sleep 3
jobtype=$(run_cdc_cli changefeed --changefeed-id $uuid query 2>&1 | grep 'admin-job-type' | grep -oE '[0-9]' | head -1)
if [[ $jobtype != 0 ]]; then
echo "[$(date)] <<<<< unexpect admin job type! expect 0 got ${jobtype} >>>>>"
if [[ $jobtype != 2 ]]; then
echo "[$(date)] <<<<< unexpect admin job type! expect 2 got ${jobtype} >>>>>"
exit 1
fi
check_changefeed_state $uuid "normal"

# Remove changefeed
run_cdc_cli changefeed --changefeed-id $uuid remove && sleep 3
check_changefeed_count http://${UP_PD_HOST_1}:${UP_PD_PORT_1} 0
jobtype=$(run_cdc_cli changefeed --changefeed-id $uuid query 2>&1 | grep 'admin-job-type' | grep -oE '[0-9]' | head -1)
if [[ $jobtype != 3 ]]; then
echo "[$(date)] <<<<< unexpect admin job type! expect 3 got ${jobtype} >>>>>"
exit 1
fi
check_changefeed_state $uuid "removed"

set +e
# Make sure changefeed can not be created if a removed changefeed with the same name exists
create_log=$(run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="$uuid" 2>&1)
set -e
exists=$(echo $create_log | grep -oE 'already exists')
if [[ -z $exists ]]; then
echo "[$(date)] <<<<< unexpect output got ${create_log} >>>>>"
exit 1
fi

# force remove the changefeed, and re create a new one with the same name
run_cdc_cli changefeed --changefeed-id $uuid remove --force && sleep 3
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid" && sleep 3
check_changefeed_state $uuid "normal"

Expand Down
9 changes: 3 additions & 6 deletions tests/cyclic_abc/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,14 @@ function run() {
--pd "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" \
--cyclic-replica-id 1 \
--cyclic-filter-replica-ids 2 \
--cyclic-sync-ddl true \
--config $CUR/conf/changefeed.toml
--cyclic-sync-ddl true

run_cdc_cli changefeed create --start-ts=$start_ts \
--sink-uri="mysql://root@${TLS_TIDB_HOST}:${TLS_TIDB_PORT}/?safe-mode=false&ssl-ca=${TLS_DIR}/ca.pem&ssl-cert=${TLS_DIR}/server.pem?ssl-key=${TLS_DIR}/server-key.pem" \
--pd "http://${DOWN_PD_HOST}:${DOWN_PD_PORT}" \
--cyclic-replica-id 2 \
--cyclic-filter-replica-ids 3 \
--cyclic-sync-ddl true \
--config $CUR/conf/changefeed.toml
--cyclic-sync-ddl true

run_cdc_cli changefeed create --start-ts=$start_ts \
--sink-uri="mysql://root@${UP_TIDB_HOST}:${UP_TIDB_PORT}/?safe-mode=false" \
Expand All @@ -122,8 +120,7 @@ function run() {
--key=$TLS_DIR/client-key.pem \
--cyclic-replica-id 3 \
--cyclic-filter-replica-ids 1 \
--cyclic-sync-ddl false \
--config $CUR/conf/changefeed.toml
--cyclic-sync-ddl false

for i in $(seq 6 15); do {
sqlup="START TRANSACTION;"
Expand Down
2 changes: 1 addition & 1 deletion tests/ddl_reentrant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ function ddl_test() {

echo $ddl > ${WORK_DIR}/ddl_temp.sql
ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" true
ddl_start_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log|tail -n 1|grep -oE '"StartTs\\":[0-9]{18}'|awk -F: '{print $(NF)}')
ddl_start_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log|tail -n 1|grep -oE '"start_ts\\":[0-9]{18}'|awk -F: '{print $(NF)}')
cdc cli changefeed remove --changefeed-id=${changefeedid}
changefeedid=$(cdc cli changefeed create --no-confirm --start-ts=${ddl_start_ts} --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
echo "create new changefeed ${changefeedid} from ${ddl_start_ts}"
Expand Down
3 changes: 2 additions & 1 deletion tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ function run() {
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
fi
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/InjectGcSafepointUpdateInterval=return(500)' # new owner
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

Expand Down Expand Up @@ -114,10 +113,12 @@ function run() {

# remove paused changefeed, the safe_point forward will recover
cdc cli changefeed remove --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "removed"
ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id

# remove all changefeeds, the safe_point will be cleared
cdc cli changefeed remove --changefeed-id=$changefeed_id2 --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "removed"
ensure $MAX_RETRIES check_safepoint_cleared $pd_addr $pd_cluster_id

cleanup_process $CDC_BINARY
Expand Down
7 changes: 5 additions & 2 deletions tests/kafka_sink_error_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ function run() {
run_sql "CREATE table kafka_sink_error_resume.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO kafka_sink_error_resume.t1 VALUES ();"

ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "error"
cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr
for i in $(seq 1 4); do
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped"
cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr
sleep 5
done
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal"

check_table_exists "kafka_sink_error_resume.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand Down
7 changes: 2 additions & 5 deletions tests/kill_owner_with_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ function kill_cdc_and_restart() {
work_dir=$2
cdc_binary=$3
MAX_RETRIES=10
status=$(curl -s http://127.0.0.1:8300/status)
cdc_pid=$(echo "$status"|jq '.pid')

cdc_pid=$(curl -s http://127.0.0.1:8300/status|jq '.pid')
kill $cdc_pid
ensure $MAX_RETRIES check_capture_count $pd_addr 0
run_cdc_server --workdir $work_dir --binary $cdc_binary --addr "127.0.0.1:8300" --pd $pd_addr
Expand Down Expand Up @@ -56,8 +54,7 @@ function run() {
run_sql "CREATE table kill_owner_with_ddl.t1 (id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists "kill_owner_with_ddl.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkExecDDLDelay=return(true);github.com/pingcap/ticdc/cdc/ownerFlushIntervalInject=return(0)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkExecDDLDelay=return(true);github.com/pingcap/ticdc/cdc/capture/ownerFlushIntervalInject=return(10)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkExecDDLDelay=return(true);github.com/pingcap/ticdc/cdc/ownerFlushIntervalInject=return(0)'
kill_cdc_and_restart $pd_addr $WORK_DIR $CDC_BINARY

for i in $(seq 2 3); do
Expand Down
3 changes: 1 addition & 2 deletions tests/owner_remove_table_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ function run() {
pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1"
SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1";

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/OwnerRemoveTableError=1*return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/OwnerRemoveTableError=1*return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/OwnerRemoveTableError=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

Expand Down
Loading

0 comments on commit cafc722

Please sign in to comment.