From 76c31f1e1685caa6a7402f715a6a202c2d5410fc Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 23 Apr 2024 12:27:10 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #10932 Signed-off-by: ti-chi-bot --- cdc/entry/mounter.go | 32 ++++++ cdc/entry/mounter_test.go | 219 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 918f1f807a0..95fe1253473 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -343,8 +343,13 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { func datum2Column( tableInfo *model.TableInfo, datums map[int64]types.Datum, tz *time.Location, +<<<<<<< HEAD ) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) +======= +) ([]*model.ColumnData, []types.Datum, []*timodel.ColumnInfo, error) { + cols := make([]*model.ColumnData, len(tableInfo.RowColumnsOffset)) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) // columnInfos and rowColumnInfos hold different column metadata, @@ -374,7 +379,11 @@ func datum2Column( if exist { colValue, size, warn, err = formatColVal(colDatums, colInfo) } else { +<<<<<<< HEAD colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz) +======= + colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) } if err != nil { return nil, nil, nil, nil, errors.Trace(err) @@ -521,7 +530,11 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info +<<<<<<< HEAD preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz) +======= + preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -550,7 +563,11 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d current uint32 ) if row.RowExist { +<<<<<<< HEAD cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, m.tz) +======= + cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, m.tz) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -717,7 +734,13 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) ( // https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 // Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support // TODO: Check default expr support +<<<<<<< HEAD func getDefaultOrZeroValue(col *timodel.ColumnInfo, tz *time.Location) (types.Datum, any, int, string, error) { +======= +func getDefaultOrZeroValue( + col *timodel.ColumnInfo, tz *time.Location, +) (types.Datum, any, int, string, error) { +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) var ( d types.Datum err error @@ -736,6 +759,15 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo, tz *time.Location) (types.Da if err != nil { return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err) } + switch col.GetType() { + case mysql.TypeTimestamp: + t := d.GetMysqlTime() + err = t.ConvertTimeZone(time.UTC, tz) + if err != nil { + return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err) + } + d.SetMysqlTime(t) + } } else if !mysql.HasNotNullFlag(col.GetFlag()) { // NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx", // ref: https://github.com/pingcap/ticdc/issues/3929 diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index a3cd7a56596..d165055d9ef 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -876,11 +876,17 @@ func TestGetDefaultZeroValue(t *testing.T) { FieldType: *ftTypeTimestampNotNull, } _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) +<<<<<<< HEAD sc := new(stmtctx.StatementContext) sc.TimeZone = tz expected, err := types.ParseTimeFromFloatString( sc, "2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) +======= + expected, err := types.ParseTimeFromFloatString( + types.DefaultStmtNoWarningContext, + "2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) require.NoError(t, err) require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + notnull + default") @@ -890,8 +896,13 @@ func TestGetDefaultZeroValue(t *testing.T) { } _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) expected, err = types.ParseTimeFromFloatString( +<<<<<<< HEAD sc, "2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) +======= + types.DefaultStmtNoWarningContext, + "2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) require.NoError(t, err) require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + null + default") @@ -918,6 +929,210 @@ func TestVerifyChecksumTime(t *testing.T) { helper := NewSchemaTestHelper(t) defer helper.Close() +<<<<<<< HEAD +======= + tk := helper.Tk() + // upstream TiDB enable checksum functionality + tk.MustExec("set global tidb_enable_row_level_checksum = 1") + helper.Tk().MustExec("use test") + + // changefeed enable checksum functionality + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness + filter, err := filter.NewFilter(replicaConfig, "") + require.NoError(t, err) + + ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) + require.NoError(t, err) + + changefeed := model.DefaultChangeFeedID("changefeed-test-decode-row") + schemaStorage, err := NewSchemaStorage(helper.Storage(), + ver.Ver, false, changefeed, util.RoleTester, filter) + require.NoError(t, err) + require.NotNil(t, schemaStorage) + + createTableSQL := `create table t ( + id int primary key auto_increment, + + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + + c_unsigned_tinyint tinyint unsigned null, + c_unsigned_smallint smallint unsigned null, + c_unsigned_mediumint mediumint unsigned null, + c_unsigned_int int unsigned null, + c_unsigned_bigint bigint unsigned null, + + c_float float null, + c_double double null, + c_decimal decimal null, + c_decimal_2 decimal(10, 4) null, + + c_unsigned_float float unsigned null, + c_unsigned_double double unsigned null, + c_unsigned_decimal decimal unsigned null, + c_unsigned_decimal_2 decimal(10, 4) unsigned null, + + c_date date null, + c_datetime datetime null, + c_timestamp timestamp null, + c_time time null, + c_year year null, + + c_tinytext tinytext null, + c_text text null, + c_mediumtext mediumtext null, + c_longtext longtext null, + + c_tinyblob tinyblob null, + c_blob blob null, + c_mediumblob mediumblob null, + c_longblob longblob null, + + c_char char(16) null, + c_varchar varchar(16) null, + c_binary binary(16) null, + c_varbinary varbinary(16) null, + + c_enum enum ('a','b','c') null, + c_set set ('a','b','c') null, + c_bit bit(64) null, + c_json json null, + +-- gbk dmls + name varchar(128) CHARACTER SET gbk, + country char(32) CHARACTER SET gbk, + city varchar(64), + description text CHARACTER SET gbk, + image tinyblob +);` + job := helper.DDL2Job(createTableSQL) + err = schemaStorage.HandleDDLJob(job) + require.NoError(t, err) + + ts := schemaStorage.GetLastSnapshot().CurrentTs() + schemaStorage.AdvanceResolvedTs(ver.Ver) + + mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, replicaConfig.Integrity).(*mounter) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "t") + require.True(t, ok) + + tk.Session().GetSessionVars().EnableRowLevelChecksum = true + + insertDataSQL := `insert into t values ( + 2, + 1, 2, 3, 4, 5, + 1, 2, 3, 4, 5, + 2020.0202, 2020.0303, 2020.0404, 2021.1208, + 3.1415, 2.7182, 8000, 179394.233, + '2020-02-20', '2020-02-20 02:20:20', '2020-02-20 02:20:20', '02:20:20', '2020', + '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', + x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + '89504E470D0A1A0A', '89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + 'b', 'b,c', b'1000001', '{ +"key1": "value1", +"key2": "value2", +"key3": "123" +}', + '测试', "中国", "上海", "你好,世界", 0xC4E3BAC3CAC0BDE7 +);` + tk.MustExec(insertDataSQL) + + key, value := getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID) + rawKV := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + Value: value, + StartTs: ts - 1, + CRTs: ts + 1, + } + row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV) + require.NoError(t, err) + require.NotNil(t, row) + require.NotNil(t, row.Checksum) + + expected, ok := mounter.decoder.GetChecksum() + require.True(t, ok) + require.Equal(t, expected, row.Checksum.Current) + require.False(t, row.Checksum.Corrupted) + + // avro encoder enable checksum functionality. + codecConfig := codecCommon.NewConfig(config.ProtocolAvro) + codecConfig.EnableTiDBExtension = true + codecConfig.EnableRowChecksum = true + codecConfig.AvroDecimalHandlingMode = "string" + codecConfig.AvroBigintUnsignedHandlingMode = "string" + + avroEncoder, err := avro.SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig) + defer avro.TeardownEncoderAndSchemaRegistry4Testing() + require.NoError(t, err) + + topic := "test.t" + + err = avroEncoder.AppendRowChangedEvent(ctx, topic, row, func() {}) + require.NoError(t, err) + msg := avroEncoder.Build() + require.Len(t, msg, 1) + + schemaM, err := avro.NewConfluentSchemaManager( + ctx, "http://127.0.0.1:8081", nil) + require.NoError(t, err) + + // decoder enable checksum functionality. + decoder := avro.NewDecoder(codecConfig, schemaM, topic) + err = decoder.AddKeyValue(msg[0].Key, msg[0].Value) + require.NoError(t, err) + + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, messageType) + + row, err = decoder.NextRowChangedEvent() + // no error, checksum verification passed. + require.NoError(t, err) +} + +func TestTimezoneDefaultValue(t *testing.T) { + helper := NewSchemaTestHelper(t) + defer helper.Close() + + _ = helper.DDL2Event(`create table test.t(a int primary key)`) + insertEvent := helper.DML2Event(`insert into test.t values (1)`, "test", "t") + require.NotNil(t, insertEvent) + + tableInfo, ok := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t") + require.True(t, ok) + + key, oldValue := helper.getLastKeyValue(tableInfo.ID) + + _ = helper.DDL2Event(`alter table test.t add column b timestamp default '2023-02-09 13:00:00'`) + ts := helper.schemaStorage.GetLastSnapshot().CurrentTs() + rawKV := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + OldValue: oldValue, + StartTs: ts - 1, + CRTs: ts + 1, + } + polymorphicEvent := model.NewPolymorphicEvent(rawKV) + err := helper.mounter.DecodeEvent(context.Background(), polymorphicEvent) + require.NoError(t, err) + + event := polymorphicEvent.Row + require.NotNil(t, event) + require.Equal(t, "2023-02-09 13:00:00", event.PreColumns[1].Value.(string)) +} + +func TestVerifyChecksumTime(t *testing.T) { +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) replicaConfig := config.GetDefaultReplicaConfig() replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness replicaConfig.Integrity.CorruptionHandleLevel = integrity.CorruptionHandleLevelError @@ -1317,7 +1532,11 @@ func TestBuildTableInfo(t *testing.T) { originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) require.NoError(t, err) cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) +<<<<<<< HEAD cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz) +======= + colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) require.NoError(t, err) recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI) From e1ae44b2d64ca01ad5acdda1f3ef211bd8684351 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 26 Apr 2024 16:34:05 +0800 Subject: [PATCH 2/2] fix conflicts. --- cdc/entry/mounter.go | 23 ---- cdc/entry/mounter_test.go | 219 -------------------------------------- 2 files changed, 242 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 95fe1253473..b75be1f7df0 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -343,13 +343,8 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { func datum2Column( tableInfo *model.TableInfo, datums map[int64]types.Datum, tz *time.Location, -<<<<<<< HEAD ) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) -======= -) ([]*model.ColumnData, []types.Datum, []*timodel.ColumnInfo, error) { - cols := make([]*model.ColumnData, len(tableInfo.RowColumnsOffset)) ->>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) // columnInfos and rowColumnInfos hold different column metadata, @@ -379,11 +374,7 @@ func datum2Column( if exist { colValue, size, warn, err = formatColVal(colDatums, colInfo) } else { -<<<<<<< HEAD colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz) -======= - colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz) ->>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) } if err != nil { return nil, nil, nil, nil, errors.Trace(err) @@ -530,11 +521,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info -<<<<<<< HEAD preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz) -======= - preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz) ->>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -563,11 +550,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d current uint32 ) if row.RowExist { -<<<<<<< HEAD cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, m.tz) -======= - cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, m.tz) ->>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -734,13 +717,7 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) ( // https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 // Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support // TODO: Check default expr support -<<<<<<< HEAD func getDefaultOrZeroValue(col *timodel.ColumnInfo, tz *time.Location) (types.Datum, any, int, string, error) { -======= -func getDefaultOrZeroValue( - col *timodel.ColumnInfo, tz *time.Location, -) (types.Datum, any, int, string, error) { ->>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) var ( d types.Datum err error diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index d165055d9ef..2cfd272f8bb 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -876,17 +876,11 @@ func TestGetDefaultZeroValue(t *testing.T) { FieldType: *ftTypeTimestampNotNull, } _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) -<<<<<<< HEAD sc := new(stmtctx.StatementContext) sc.TimeZone = tz expected, err := types.ParseTimeFromFloatString( sc, - "2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) -======= - expected, err := types.ParseTimeFromFloatString( - types.DefaultStmtNoWarningContext, "2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) ->>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) require.NoError(t, err) require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + notnull + default") @@ -896,13 +890,8 @@ func TestGetDefaultZeroValue(t *testing.T) { } _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) expected, err = types.ParseTimeFromFloatString( -<<<<<<< HEAD sc, - "2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) -======= - types.DefaultStmtNoWarningContext, "2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) ->>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) require.NoError(t, err) require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + null + default") @@ -929,210 +918,6 @@ func TestVerifyChecksumTime(t *testing.T) { helper := NewSchemaTestHelper(t) defer helper.Close() -<<<<<<< HEAD -======= - tk := helper.Tk() - // upstream TiDB enable checksum functionality - tk.MustExec("set global tidb_enable_row_level_checksum = 1") - helper.Tk().MustExec("use test") - - // changefeed enable checksum functionality - replicaConfig := config.GetDefaultReplicaConfig() - replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness - filter, err := filter.NewFilter(replicaConfig, "") - require.NoError(t, err) - - ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) - require.NoError(t, err) - - changefeed := model.DefaultChangeFeedID("changefeed-test-decode-row") - schemaStorage, err := NewSchemaStorage(helper.Storage(), - ver.Ver, false, changefeed, util.RoleTester, filter) - require.NoError(t, err) - require.NotNil(t, schemaStorage) - - createTableSQL := `create table t ( - id int primary key auto_increment, - - c_tinyint tinyint null, - c_smallint smallint null, - c_mediumint mediumint null, - c_int int null, - c_bigint bigint null, - - c_unsigned_tinyint tinyint unsigned null, - c_unsigned_smallint smallint unsigned null, - c_unsigned_mediumint mediumint unsigned null, - c_unsigned_int int unsigned null, - c_unsigned_bigint bigint unsigned null, - - c_float float null, - c_double double null, - c_decimal decimal null, - c_decimal_2 decimal(10, 4) null, - - c_unsigned_float float unsigned null, - c_unsigned_double double unsigned null, - c_unsigned_decimal decimal unsigned null, - c_unsigned_decimal_2 decimal(10, 4) unsigned null, - - c_date date null, - c_datetime datetime null, - c_timestamp timestamp null, - c_time time null, - c_year year null, - - c_tinytext tinytext null, - c_text text null, - c_mediumtext mediumtext null, - c_longtext longtext null, - - c_tinyblob tinyblob null, - c_blob blob null, - c_mediumblob mediumblob null, - c_longblob longblob null, - - c_char char(16) null, - c_varchar varchar(16) null, - c_binary binary(16) null, - c_varbinary varbinary(16) null, - - c_enum enum ('a','b','c') null, - c_set set ('a','b','c') null, - c_bit bit(64) null, - c_json json null, - --- gbk dmls - name varchar(128) CHARACTER SET gbk, - country char(32) CHARACTER SET gbk, - city varchar(64), - description text CHARACTER SET gbk, - image tinyblob -);` - job := helper.DDL2Job(createTableSQL) - err = schemaStorage.HandleDDLJob(job) - require.NoError(t, err) - - ts := schemaStorage.GetLastSnapshot().CurrentTs() - schemaStorage.AdvanceResolvedTs(ver.Ver) - - mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, replicaConfig.Integrity).(*mounter) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "t") - require.True(t, ok) - - tk.Session().GetSessionVars().EnableRowLevelChecksum = true - - insertDataSQL := `insert into t values ( - 2, - 1, 2, 3, 4, 5, - 1, 2, 3, 4, 5, - 2020.0202, 2020.0303, 2020.0404, 2021.1208, - 3.1415, 2.7182, 8000, 179394.233, - '2020-02-20', '2020-02-20 02:20:20', '2020-02-20 02:20:20', '02:20:20', '2020', - '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', - x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', - '89504E470D0A1A0A', '89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', - 'b', 'b,c', b'1000001', '{ -"key1": "value1", -"key2": "value2", -"key3": "123" -}', - '测试', "中国", "上海", "你好,世界", 0xC4E3BAC3CAC0BDE7 -);` - tk.MustExec(insertDataSQL) - - key, value := getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID) - rawKV := &model.RawKVEntry{ - OpType: model.OpTypePut, - Key: key, - Value: value, - StartTs: ts - 1, - CRTs: ts + 1, - } - row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV) - require.NoError(t, err) - require.NotNil(t, row) - require.NotNil(t, row.Checksum) - - expected, ok := mounter.decoder.GetChecksum() - require.True(t, ok) - require.Equal(t, expected, row.Checksum.Current) - require.False(t, row.Checksum.Corrupted) - - // avro encoder enable checksum functionality. - codecConfig := codecCommon.NewConfig(config.ProtocolAvro) - codecConfig.EnableTiDBExtension = true - codecConfig.EnableRowChecksum = true - codecConfig.AvroDecimalHandlingMode = "string" - codecConfig.AvroBigintUnsignedHandlingMode = "string" - - avroEncoder, err := avro.SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig) - defer avro.TeardownEncoderAndSchemaRegistry4Testing() - require.NoError(t, err) - - topic := "test.t" - - err = avroEncoder.AppendRowChangedEvent(ctx, topic, row, func() {}) - require.NoError(t, err) - msg := avroEncoder.Build() - require.Len(t, msg, 1) - - schemaM, err := avro.NewConfluentSchemaManager( - ctx, "http://127.0.0.1:8081", nil) - require.NoError(t, err) - - // decoder enable checksum functionality. - decoder := avro.NewDecoder(codecConfig, schemaM, topic) - err = decoder.AddKeyValue(msg[0].Key, msg[0].Value) - require.NoError(t, err) - - messageType, hasNext, err := decoder.HasNext() - require.NoError(t, err) - require.True(t, hasNext) - require.Equal(t, model.MessageTypeRow, messageType) - - row, err = decoder.NextRowChangedEvent() - // no error, checksum verification passed. - require.NoError(t, err) -} - -func TestTimezoneDefaultValue(t *testing.T) { - helper := NewSchemaTestHelper(t) - defer helper.Close() - - _ = helper.DDL2Event(`create table test.t(a int primary key)`) - insertEvent := helper.DML2Event(`insert into test.t values (1)`, "test", "t") - require.NotNil(t, insertEvent) - - tableInfo, ok := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t") - require.True(t, ok) - - key, oldValue := helper.getLastKeyValue(tableInfo.ID) - - _ = helper.DDL2Event(`alter table test.t add column b timestamp default '2023-02-09 13:00:00'`) - ts := helper.schemaStorage.GetLastSnapshot().CurrentTs() - rawKV := &model.RawKVEntry{ - OpType: model.OpTypePut, - Key: key, - OldValue: oldValue, - StartTs: ts - 1, - CRTs: ts + 1, - } - polymorphicEvent := model.NewPolymorphicEvent(rawKV) - err := helper.mounter.DecodeEvent(context.Background(), polymorphicEvent) - require.NoError(t, err) - - event := polymorphicEvent.Row - require.NotNil(t, event) - require.Equal(t, "2023-02-09 13:00:00", event.PreColumns[1].Value.(string)) -} - -func TestVerifyChecksumTime(t *testing.T) { ->>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) replicaConfig := config.GetDefaultReplicaConfig() replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness replicaConfig.Integrity.CorruptionHandleLevel = integrity.CorruptionHandleLevelError @@ -1532,11 +1317,7 @@ func TestBuildTableInfo(t *testing.T) { originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) require.NoError(t, err) cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) -<<<<<<< HEAD cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz) -======= - colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz) ->>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) require.NoError(t, err) recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)