Skip to content

Commit

Permalink
cmd/storage-consumer(ticdc): fix broken tests (pingcap#8566)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Mar 21, 2023
1 parent 2b1b5ca commit 35921ef
Show file tree
Hide file tree
Showing 14 changed files with 315 additions and 28 deletions.
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ check_third_party_binary:
@which bin/jq
@which bin/minio

integration_test_build: check_failpoint_ctl
integration_test_build: check_failpoint_ctl storage_consumer kafka_consumer
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/tiflow/... \
Expand Down Expand Up @@ -243,9 +243,6 @@ clean_integration_test_containers: ## Clean MySQL and Kafka integration test con
docker-compose -f $(TICDC_DOCKER_DEPLOYMENTS_DIR)/docker-compose-mysql-integration.yml down -v
docker-compose -f $(TICDC_DOCKER_DEPLOYMENTS_DIR)/docker-compose-kafka-integration.yml down -v

integration_test_storage: check_third_party_binary
tests/integration_tests/run.sh storage "$(CASE)" "$(START_AT)"

fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci
@echo "run gci (format imports)"
tools/bin/gci write $(FILES) 2>&1 | $(FAIL_ON_STDOUT)
Expand Down
13 changes: 11 additions & 2 deletions cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var (
logLevel string
flushInterval time.Duration
enableProfiling bool
timezone string
)

const (
Expand All @@ -81,6 +82,7 @@ func init() {
flag.StringVar(&logLevel, "log-level", "info", "log level")
flag.DurationVar(&flushInterval, "flush-interval", 10*time.Second, "flush interval")
flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling")
flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer")
flag.Parse()

err := logutil.InitLogger(&logutil.Config{
Expand Down Expand Up @@ -245,6 +247,11 @@ type consumer struct {
}

func newConsumer(ctx context.Context) (*consumer, error) {
tz, err := putil.GetTimezone(timezone)
if err != nil {
return nil, errors.Annotate(err, "can not load timezone")
}
ctx = contextutil.PutTimezoneInCtx(ctx, tz)
replicaConfig := config.GetDefaultReplicaConfig()
if len(configFile) > 0 {
err := util.StrictDecodeFile(configFile, "storage consumer", replicaConfig)
Expand All @@ -254,7 +261,7 @@ func newConsumer(ctx context.Context) (*consumer, error) {
}
}

err := replicaConfig.ValidateAndAdjust(upstreamURI)
err = replicaConfig.ValidateAndAdjust(upstreamURI)
if err != nil {
log.Error("failed to validate replica config", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -435,7 +442,9 @@ func (c *consumer) emitDMLEvents(
return errors.Trace(err)
}
case config.ProtocolCanalJSON:
decoder = canal.NewBatchDecoder(content, false, c.codecCfg.Terminator)
// Always enable tidb extension for canal-json protocol
// because we need to get the commit ts from the extension field.
decoder = canal.NewBatchDecoder(content, true, c.codecCfg.Terminator)
}

cnt := 0
Expand Down
2 changes: 1 addition & 1 deletion deployments/ticdc/docker/integration-test.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ COPY . .
# Clean bin dir and build TiCDC.
# We always need to clean before we build, please don't adjust its order.
RUN make clean
RUN make integration_test_build kafka_consumer storage_consumer cdc
RUN make integration_test_build cdc
COPY --from=downloader /root/download/bin/* ./bin/
RUN make check_third_party_binary
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export-fix-sql = true
check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/clustered_index/sync_diff/output"
output-dir = "/tmp/tidb_cdc_test/canal_json_storage_basic/sync_diff/output"

source-instances = ["mysql1"]

Expand Down
9 changes: 6 additions & 3 deletions tests/integration_tests/canal_json_storage_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,26 @@ stop() {
s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_ENDPOINT --host-bucket=$S3_ENDPOINT --no-ssl mb s3://logbucket

function run() {
if [ "$SINK_TYPE" != "storage" ]; then
# Now, we run the storage tests in mysql sink tests.
# It's a temporary solution, we will move it to a new test pipeline later.
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

SINK_URI="s3://logbucket/storage_test?flush-interval=5s&endpoint=http://127.0.0.1:24927/"
# Enable tidb extension to generate the commit ts.
SINK_URI="s3://logbucket/storage_test?flush-interval=5s&enable-tidb-extension=true&endpoint=http://127.0.0.1:24927/"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml

run_sql_file $CUR/data/schema.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/schema.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_storage_consumer $WORK_DIR "s3://logbucket/storage_test?endpoint=http://127.0.0.1:24927/" $CUR/conf/changefeed.toml ""
sleep 8
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100
}

trap stop EXIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export-fix-sql = true
check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/clustered_index/sync_diff/output"
output-dir = "/tmp/tidb_cdc_test/csv_storage_basic/sync_diff/output"

source-instances = ["mysql1"]

Expand Down
6 changes: 4 additions & 2 deletions tests/integration_tests/csv_storage_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ stop() {
s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_ENDPOINT --host-bucket=$S3_ENDPOINT --no-ssl mb s3://logbucket

function run() {
if [ "$SINK_TYPE" != "storage" ]; then
# Now, we run the storage tests in mysql sink tests.
# It's a temporary solution, we will move it to a new test pipeline later.
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

Expand All @@ -58,7 +60,7 @@ function run() {
run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_storage_consumer $WORK_DIR "s3://logbucket/storage_test?endpoint=http://127.0.0.1:24927/" $CUR/conf/changefeed.toml ""
sleep 8
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100
}

trap stop EXIT
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/csv_storage_multi_tables_ddl/sync_diff/output"

source-instances = ["tidb0"]

target-instance = "mysql1"

target-check-tables = ["multi_tables_ddl_test.t1_7", "multi_tables_ddl_test.t2_7"]

[data-sources]
[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[filter]
rules = ["multi_tables_ddl_test.t5", "multi_tables_ddl_test.t6", "multi_tables_ddl_test.t7", "multi_tables_ddl_test.t8"]

[sink.csv]
include-commit-ts = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[filter]
rules = ["multi_tables_ddl_test.t9", "multi_tables_ddl_test.t10"]

[sink.csv]
include-commit-ts = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[filter]
rules = ["multi_tables_ddl_test.t1", "multi_tables_ddl_test.t2", "multi_tables_ddl_test.t3", "multi_tables_ddl_test.t4" ,"multi_tables_ddl_test.t1_7","multi_tables_ddl_test.t2_7", "multi_tables_ddl_test.finish_mark"]

[sink.csv]
include-commit-ts = true
139 changes: 139 additions & 0 deletions tests/integration_tests/csv_storage_multi_tables_ddl/data/test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
drop database if exists `multi_tables_ddl_test`;
create database `multi_tables_ddl_test`;
use `multi_tables_ddl_test`;

create table t1 (
value64 bigint unsigned not null,
primary key(value64)
);
insert into t1 values(17156792991891826145);
insert into t1 values(91891826145);
delete from t1 where value64=17156792991891826145;
update t1 set value64=17156792991891826;
update t1 set value64=56792991891826;

rename table t1 to t1_1;

create table t2 (
value64 bigint unsigned not null,
primary key(value64)
);
insert into t2 values(17156792991891826145);
insert into t2 values(91891826145);
delete from t2 where value64=91891826145;
update t2 set value64=17156792991891826;
update t2 set value64=56792991891826;

rename table t2 to t2_2;

create table t1 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

create table t2 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

create table t3 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

create table t4 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

/*use by error changefeed*/
create table t5 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

/*use by error1 changefeed*/
create table t6 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

/*use by error1 changefeed*/
create table t7 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

/*use by error1 changefeed*/
create table t8 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

/*use by error2 changefeed*/
create table t10 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

/*use by error2 changefeed*/
create table t11 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

insert into t1 values(17156792991891826145, 1);
insert into t1 values( 9223372036854775807, 2);
insert into t2 values(17156792991891826145, 3);
insert into t2 values( 9223372036854775807, 4);

insert into t5 values(17156792991891826145, 1);
insert into t6 values( 9223372036854775807, 2);
insert into t7 values(17156792991891826145, 3);
insert into t8 values( 9223372036854775807, 4);


rename table t1 to t1_7, t2 to t2_7;

insert into t1_7 values(91891826145, 5);
insert into t1_7 values(685477580, 6);
insert into t2_7 values(1715679991826145, 7);
insert into t2_7 values(2036854775807, 8);

insert into t3 select * from t1_7;
insert into t4 select * from t2_7;
drop table t3, t4;



/* cf_err1, filter.rules = ["multi_tables_ddl_test.t5", "multi_tables_ddl_test.t6", "multi_tables_ddl_test.t7, "multi_tables_ddl_test.t8"] */
/* replicate successful, they are all in `filter.rule` */
rename table t5 to t55, t6 to t66;
/* discard by cdc totally, they are all not in `filter.rule` */
rename table t55 to t555, t66 to t666;
/* replicate successful, since t8 in `filter.rule` */
rename table t8 to t88;
/* discard, t88 and t888 both not in `filter.rule` */
rename table t88 to t888;


/* cf_err2, filter.rules = ["multi_tables_ddl_test.t9", "multi_tables_ddl_test.t10"] */
/* replicate successful, since t10 in `filter.rule` */
rename table t10 to t9;
/* replicate successful, since t9 in `filter.rule` */
rename table t9 to t13;
/* discard, t13 and t14 both not in `filter.rule` */
rename table t13 to t14;
/* error, t11 not match `filter.rule` and t9 match `filter.rule` */
rename table t11 to t9;
create table finish_mark(id int primary key);
Loading

0 comments on commit 35921ef

Please sign in to comment.