Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10932
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Apr 23, 2024
1 parent 823a389 commit 5695d4f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 0 deletions.
26 changes: 26 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,13 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum, tz *time.Location,
<<<<<<< HEAD
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
=======
) ([]*model.ColumnData, []types.Datum, []*timodel.ColumnInfo, error) {
cols := make([]*model.ColumnData, len(tableInfo.RowColumnsOffset))
>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))

// columnInfos and rowColumnInfos hold different column metadata,
Expand Down Expand Up @@ -374,7 +379,11 @@ func datum2Column(
if exist {
colValue, size, warn, err = formatColVal(colDatums, colInfo)
} else {
<<<<<<< HEAD
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz)
=======
colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz)
>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932))
}
if err != nil {
return nil, nil, nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -508,7 +517,11 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
<<<<<<< HEAD
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz)
=======
preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz)
>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932))
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -537,7 +550,11 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
current uint32
)
if row.RowExist {
<<<<<<< HEAD
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, m.tz)
=======
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, m.tz)
>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932))
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -725,6 +742,15 @@ func getDefaultOrZeroValue(
if err != nil {
return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err)
}
switch col.GetType() {
case mysql.TypeTimestamp:
t := d.GetMysqlTime()
err = t.ConvertTimeZone(time.UTC, tz)
if err != nil {
return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err)
}
d.SetMysqlTime(t)
}
} else if !mysql.HasNotNullFlag(col.GetFlag()) {
// NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx",
// ref: https://github.com/pingcap/ticdc/issues/3929
Expand Down
70 changes: 70 additions & 0 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,11 +896,17 @@ func TestGetDefaultZeroValue(t *testing.T) {
FieldType: *ftTypeTimestampNotNull,
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
<<<<<<< HEAD
sc := new(stmtctx.StatementContext)
sc.TimeZone = tz
expected, err := types.ParseTimeFromFloatString(
sc,
"2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
=======
expected, err := types.ParseTimeFromFloatString(
types.DefaultStmtNoWarningContext,
"2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932))
require.NoError(t, err)
require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + notnull + default")

Expand All @@ -910,8 +916,13 @@ func TestGetDefaultZeroValue(t *testing.T) {
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expected, err = types.ParseTimeFromFloatString(
<<<<<<< HEAD
sc,
"2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
=======
types.DefaultStmtNoWarningContext,
"2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932))
require.NoError(t, err)
require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + null + default")

Expand Down Expand Up @@ -1107,6 +1118,61 @@ func TestE2ERowLevelChecksum(t *testing.T) {
require.NoError(t, err)
}

<<<<<<< HEAD
=======
func TestTimezoneDefaultValue(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()

_ = helper.DDL2Event(`create table test.t(a int primary key)`)
insertEvent := helper.DML2Event(`insert into test.t values (1)`, "test", "t")
require.NotNil(t, insertEvent)

tableInfo, ok := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t")
require.True(t, ok)

key, oldValue := helper.getLastKeyValue(tableInfo.ID)

_ = helper.DDL2Event(`alter table test.t add column b timestamp default '2023-02-09 13:00:00'`)
ts := helper.schemaStorage.GetLastSnapshot().CurrentTs()
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
OldValue: oldValue,
StartTs: ts - 1,
CRTs: ts + 1,
}
polymorphicEvent := model.NewPolymorphicEvent(rawKV)
err := helper.mounter.DecodeEvent(context.Background(), polymorphicEvent)
require.NoError(t, err)

event := polymorphicEvent.Row
require.NotNil(t, event)
require.Equal(t, "2023-02-09 13:00:00", event.PreColumns[1].Value.(string))
}

func TestVerifyChecksumTime(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
replicaConfig.Integrity.CorruptionHandleLevel = integrity.CorruptionHandleLevelError

helper := NewSchemaTestHelperWithReplicaConfig(t, replicaConfig)
defer helper.Close()

helper.Tk().MustExec("set global tidb_enable_row_level_checksum = 1")
helper.Tk().MustExec("use test")

helper.Tk().MustExec("set global time_zone = '-5:00'")
_ = helper.DDL2Event(`CREATE table TBL2 (a int primary key, b TIMESTAMP)`)
event := helper.DML2Event(`INSERT INTO TBL2 VALUES (1, '2023-02-09 13:00:00')`, "test", "TBL2")
require.NotNil(t, event)

_ = helper.DDL2Event("create table t (a timestamp primary key, b int)")
event = helper.DML2Event("insert into t values ('2023-02-09 13:00:00', 1)", "test", "t")
require.NotNil(t, event)
}

>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932))
func TestDecodeRowEnableChecksum(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()
Expand Down Expand Up @@ -1526,7 +1592,11 @@ func TestBuildTableInfo(t *testing.T) {
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
<<<<<<< HEAD
cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz)
=======
colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz)
>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932))
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
Expand Down

0 comments on commit 5695d4f

Please sign in to comment.