Skip to content

Commit

Permalink
point-get: calculate the row level checksum on the real time to avoid…
Browse files Browse the repository at this point in the history
… incorrect checksum caused by schema change (#52511)

close #52590
  • Loading branch information
3AceShowHand authored May 11, 2024
1 parent 1bac22a commit 6d92e7a
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 61 deletions.
12 changes: 10 additions & 2 deletions pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,24 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.index >= len(e.values) {
return nil
}

schema := e.Schema()
sctx := e.BaseExecutor.Ctx()
start := e.index
for !req.IsFull() && e.index < len(e.values) {
handle, val := e.handles[e.index], e.values[e.index]
err := DecodeRowValToChunk(e.BaseExecutor.Ctx(), e.Schema(), e.tblInfo, handle, val, req, e.rowDecoder)
err := DecodeRowValToChunk(sctx, schema, e.tblInfo, handle, val, req, e.rowDecoder)
if err != nil {
return err
}
e.index++
}

err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx().GetExprCtx(), req)
err := fillRowChecksum(sctx, start, e.index, schema, e.tblInfo, e.values, e.handles, req, nil)
if err != nil {
return err
}
err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, schema.Columns, e.columns, sctx.GetExprCtx(), req)
if err != nil {
return err
}
Expand Down
109 changes: 107 additions & 2 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package executor
import (
"context"
"fmt"
"sort"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -387,19 +389,122 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
err = DecodeRowValToChunk(e.BaseExecutor.Ctx(), e.Schema(), e.tblInfo, e.handle, val, req, e.rowDecoder)

sctx := e.BaseExecutor.Ctx()
schema := e.Schema()
err = DecodeRowValToChunk(sctx, schema, e.tblInfo, e.handle, val, req, e.rowDecoder)
if err != nil {
return err
}

err = fillRowChecksum(sctx, 0, 1, schema, e.tblInfo, [][]byte{val}, []kv.Handle{e.handle}, req, nil)
if err != nil {
return err
}

err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex,
e.Schema().Columns, e.columns, e.Ctx().GetExprCtx(), req)
schema.Columns, e.columns, sctx.GetExprCtx(), req)
if err != nil {
return err
}
return nil
}

func shouldFillRowChecksum(schema *expression.Schema) (int, bool) {
for idx, col := range schema.Columns {
if col.ID == model.ExtraRowChecksumID {
return idx, true
}
}
return 0, false
}

func fillRowChecksum(
sctx sessionctx.Context,
start, end int,
schema *expression.Schema, tblInfo *model.TableInfo,
values [][]byte, handles []kv.Handle,
req *chunk.Chunk, buf []byte,
) error {
checksumColumnIndex, ok := shouldFillRowChecksum(schema)
if !ok {
return nil
}

var handleColIDs []int64
if tblInfo.PKIsHandle {
colInfo := tblInfo.GetPkColInfo()
handleColIDs = []int64{colInfo.ID}
} else if tblInfo.IsCommonHandle {
pkIdx := tables.FindPrimaryIndex(tblInfo)
for _, col := range pkIdx.Columns {
colInfo := tblInfo.Columns[col.Offset]
handleColIDs = append(handleColIDs, colInfo.ID)
}
}

columnFt := make(map[int64]*types.FieldType)
for idx := range tblInfo.Columns {
col := tblInfo.Columns[idx]
columnFt[col.ID] = &col.FieldType
}
tz := sctx.GetSessionVars().TimeZone
ft := []*types.FieldType{schema.Columns[checksumColumnIndex].GetType()}
checksumCols := chunk.NewChunkWithCapacity(ft, req.Capacity())
for i := start; i < end; i++ {
handle, val := handles[i], values[i]
if !rowcodec.IsNewFormat(val) {
checksumCols.AppendNull(0)
continue
}
datums, err := tablecodec.DecodeRowWithMapNew(val, columnFt, tz, nil)
if err != nil {
return err
}
datums, err = tablecodec.DecodeHandleToDatumMap(handle, handleColIDs, columnFt, tz, datums)
if err != nil {
return err
}
for _, col := range tblInfo.Columns {
// cannot found from the datums, which means the data is not stored, this
// may happen after `add column` executed, filling with the default value.
_, ok := datums[col.ID]
if !ok {
colInfo := getColInfoByID(tblInfo, col.ID)
d, err := table.GetColOriginDefaultValue(sctx.GetExprCtx(), colInfo)
if err != nil {
return err
}
datums[col.ID] = d
}
}

colData := make([]rowcodec.ColData, len(tblInfo.Columns))
for idx, col := range tblInfo.Columns {
d := datums[col.ID]
data := rowcodec.ColData{
ColumnInfo: col,
Datum: &d,
}
colData[idx] = data
}
row := rowcodec.RowData{
Cols: colData,
Data: buf,
}
if !sort.IsSorted(row) {
sort.Sort(row)
}
checksum, err := row.Checksum(tz)
if err != nil {
return err
}
checksumCols.AppendString(0, strconv.FormatUint(uint64(checksum), 10))
}
req.SetCol(checksumColumnIndex, checksumCols.Column(0))
return nil
}

func (e *PointGetExecutor) getAndLock(ctx context.Context, key kv.Key) (val []byte, err error) {
if e.Ctx().GetSessionVars().IsPessimisticReadConsistency() {
// Only Lock the existing keys in RC isolation.
Expand Down
2 changes: 1 addition & 1 deletion pkg/expression/integration_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 25,
shard_count = 27,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
96 changes: 77 additions & 19 deletions pkg/expression/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2963,48 +2963,106 @@ PARTITION BY RANGE (c) (
tk.MustExec("set global tidb_enable_local_txn = off;")
}

func TestTiDBRowChecksumBuiltin(t *testing.T) {
store := testkit.CreateMockStore(t)

checksum := func(cols ...any) uint32 {
buf := make([]byte, 0, 64)
for _, col := range cols {
switch x := col.(type) {
case int:
buf = binary.LittleEndian.AppendUint64(buf, uint64(x))
case string:
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(x)))
buf = append(buf, []byte(x)...)
}
func calculateChecksum(cols ...any) string {
buf := make([]byte, 0, 64)
for _, col := range cols {
switch x := col.(type) {
case int:
buf = binary.LittleEndian.AppendUint64(buf, uint64(x))
case string:
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(x)))
buf = append(buf, []byte(x)...)
}
return crc32.ChecksumIEEE(buf)
}
checksum := crc32.ChecksumIEEE(buf)
return fmt.Sprintf("%d", checksum)
}

func TestTiDBRowChecksumBuiltinAfterDropColumn(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("create table t(a int primary key, b int, c int)")
tk.MustExec("insert into t values(1, 1, 1)")

oldChecksum := tk.MustQuery("select tidb_row_checksum() from t where a = 1").Rows()[0][0].(string)

tk.MustExec("alter table t drop column b")
newChecksum := tk.MustQuery("select tidb_row_checksum() from t where a = 1").Rows()[0][0].(string)

require.NotEqual(t, oldChecksum, newChecksum)
}

func TestTiDBRowChecksumBuiltinAfterAddColumn(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("create table t(a int primary key, b int)")
tk.MustExec("insert into t values(1, 1)")

oldChecksum := tk.MustQuery("select tidb_row_checksum() from t where a = 1").Rows()[0][0].(string)
expected := calculateChecksum(1, 1)
require.Equal(t, expected, oldChecksum)

tk.MustExec("alter table t add column c int default 1")
newChecksum := tk.MustQuery("select tidb_row_checksum() from t where a = 1").Rows()[0][0].(string)
expected = calculateChecksum(1, 1, 1)
require.Equal(t, expected, newChecksum)

require.NotEqual(t, oldChecksum, newChecksum)
}

func TestTiDBRowChecksumBuiltin(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("create table t (id int primary key, c int)")

// row with 2 checksums
// row with 1 checksum
tk.MustExec("insert into t values (1, 10)")
tk.MustExec("alter table t change column c c varchar(10)")
checksum1 := fmt.Sprintf("%d,%d", checksum(1, 10), checksum(1, "10"))
// row with 1 checksum
checksum1 := calculateChecksum(1, "10")
checksum11 := fmt.Sprintf("%d %v %v", 1, "10", checksum1)

tk.Session().GetSessionVars().EnableRowLevelChecksum = true
tk.MustExec("insert into t values (2, '20')")
checksum2 := fmt.Sprintf("%d", checksum(2, "20"))
checksum2 := calculateChecksum(2, "20")
checksum22 := fmt.Sprintf("%d %v %v", 2, checksum2, "20")

// row without checksum
tk.Session().GetSessionVars().EnableRowLevelChecksum = false
tk.MustExec("insert into t values (3, '30')")
checksum3 := "<nil>"
checksum3 := calculateChecksum(3, "30")
checksum33 := fmt.Sprintf("%v %d %v", checksum3, 3, "30")

// fast point-get
tk.MustQuery("select tidb_row_checksum() from t where id = 1").Check(testkit.Rows(checksum1))
tk.MustQuery("select id, c, tidb_row_checksum() from t where id = 1").Check(testkit.Rows(checksum11))

tk.MustQuery("select tidb_row_checksum() from t where id = 2").Check(testkit.Rows(checksum2))
tk.MustQuery("select id, tidb_row_checksum(), c from t where id = 2").Check(testkit.Rows(checksum22))

tk.MustQuery("select tidb_row_checksum() from t where id = 3").Check(testkit.Rows(checksum3))
tk.MustQuery("select tidb_row_checksum(), id, c from t where id = 3").Check(testkit.Rows(checksum33))

// fast batch-point-get
tk.MustQuery("select tidb_row_checksum() from t where id in (1, 2, 3)").Check(testkit.Rows(checksum1, checksum2, checksum3))

tk.MustQuery("select id, c, tidb_row_checksum() from t where id in (1, 2, 3)").
Check(testkit.Rows(
checksum11,
fmt.Sprintf("%d %v %v", 2, "20", checksum2),
fmt.Sprintf("%d %v %v", 3, "30", checksum3),
))

// non-fast point-get
tk.MustGetDBError("select length(tidb_row_checksum()) from t where id = 1", expression.ErrNotSupportedYet)
tk.MustGetDBError("select c from t where id = 1 and tidb_row_checksum() is not null", expression.ErrNotSupportedYet)
Expand Down
38 changes: 18 additions & 20 deletions pkg/tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,26 +543,24 @@ func DecodeHandleToDatumMap(handle kv.Handle, handleColIDs []int64,
if row == nil {
row = make(map[int64]types.Datum, len(cols))
}
for id, ft := range cols {
for idx, hid := range handleColIDs {
if id != hid {
continue
}
if types.NeedRestoredData(ft) {
continue
}
d, err := decodeHandleToDatum(handle, ft, idx)
if err != nil {
return row, err
}
d, err = Unflatten(d, ft, loc)
if err != nil {
return row, err
}
if _, exists := row[id]; !exists {
row[id] = d
}
break
for idx, id := range handleColIDs {
ft, ok := cols[id]
if !ok {
continue
}
if types.NeedRestoredData(ft) {
continue
}
d, err := decodeHandleToDatum(handle, ft, idx)
if err != nil {
return row, err
}
d, err = Unflatten(d, ft, loc)
if err != nil {
return row, err
}
if _, exists := row[id]; !exists {
row[id] = d
}
}
return row, nil
Expand Down
6 changes: 1 addition & 5 deletions pkg/util/rowcodec/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,7 @@ func (decoder *ChunkDecoder) DecodeToChunk(rowData []byte, handle kv.Handle, chk
continue
}
if col.ID == model.ExtraRowChecksumID {
if v := decoder.row.getChecksumInfo(); len(v) > 0 {
chk.AppendString(colIdx, v)
} else {
chk.AppendNull(colIdx)
}
chk.AppendNull(colIdx)
continue
}

Expand Down
12 changes: 0 additions & 12 deletions pkg/util/rowcodec/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package rowcodec

import (
"encoding/binary"
"strconv"
)

const (
Expand Down Expand Up @@ -101,17 +100,6 @@ func (r *row) setChecksums(checksums ...uint32) {
}
}

func (r *row) getChecksumInfo() string {
var s string
if r.hasChecksum() {
s = strconv.FormatUint(uint64(r.checksum1), 10)
if r.hasExtraChecksum() {
s += "," + strconv.FormatUint(uint64(r.checksum2), 10)
}
}
return s
}

func (r *row) getData(i int) []byte {
var start, end uint32
if r.large() {
Expand Down

0 comments on commit 6d92e7a

Please sign in to comment.