Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): split RowChangeEvent if unique key is updated #9437

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package model

import (
"fmt"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -47,6 +49,13 @@ const (
MessageTypeResolved
)

const (
// the RowChangedEvent order in the same transaction
typeDelete = iota + 1
typeUpdate
typeInsert
)

// ColumnFlagType is for encapsulating the flag operations for different flags.
type ColumnFlagType util.Flag

Expand Down Expand Up @@ -261,6 +270,11 @@ func (r *RedoLog) GetCommitTs() Ts {
return 0
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent() error {
return nil
}

// RedoRowChangedEvent represents the DML event used in RedoLog
type RedoRowChangedEvent struct {
Row *RowChangedEvent `msg:"row"`
Expand Down Expand Up @@ -332,11 +346,43 @@ type RowChangedEvent struct {
ReplicatingTs Ts `json:"-" msg:"-"`
}

// txnRows represents a set of events that belong to the same transaction.
type txnRows []*RowChangedEvent
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved

// Len is the number of elements in the collection.
func (e txnRows) Len() int {
return len(e)
}

// Less sort the events base on the order of event type delete<update<insert
func (e txnRows) Less(i, j int) bool {
return getDMLOrder(e[i]) < getDMLOrder(e[j])
}

// getDMLOrder returns the order of the dml types: delete<update<insert
func getDMLOrder(event *RowChangedEvent) int {
if event.IsDelete() {
return typeDelete
} else if event.IsUpdate() {
return typeUpdate
}
return typeInsert
}

func (e txnRows) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}

// GetCommitTs returns the commit timestamp of this event.
func (r *RowChangedEvent) GetCommitTs() uint64 {
return r.CommitTs
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent() error {
return nil
}

// IsDelete returns true if the row is a delete event
func (r *RowChangedEvent) IsDelete() bool {
return len(r.PreColumns) != 0 && len(r.Columns) == 0
Expand Down Expand Up @@ -719,6 +765,112 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
return t.CommitTs
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
if len(t.Rows) < 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible there is only an update event, shall we split it here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it's not needed to split a single update event.

Copy link
Contributor

@3AceShowHand 3AceShowHand Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this update event has unique key columns changed? I think this update event also should be splitted

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is almost identical to the convertRowChangedEvents method.

After the enable-old-value is removed, this 2 method can be merged into one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pr focuses on the duplicated key case, the pre-condition is more than two update events emitted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convertRowChangedEvents checks "enable-old-value" and handle key.

return nil
}
newRows, split, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
}
// some updated events is split, need to sort
if split {
sdojjy marked this conversation as resolved.
Show resolved Hide resolved
sort.Sort(txnRows(newRows))
}
t.Rows = newRows
return nil
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
events []*RowChangedEvent,
) ([]*RowChangedEvent, bool, error) {
rowChangedEvents := make([]*RowChangedEvent, 0, len(events))
needSplit := false
sdojjy marked this conversation as resolved.
Show resolved Hide resolved
for _, e := range events {
if e == nil {
log.Warn("skip emit nil event",
zap.Any("event", e))
continue
}

colLen := len(e.Columns)
preColLen := len(e.PreColumns)
// Some transactions could generate empty row change event, such as
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event",
zap.Any("event", e))
continue
}

// This indicates that it is an update event. if the pk or uk is updated,
// we need to split it into two events (delete and insert).
if e.IsUpdate() && shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if err != nil {
return nil, false, errors.Trace(err)
}
needSplit = true
rowChangedEvents = append(rowChangedEvents, deleteEvent, insertEvent)
} else {
rowChangedEvents = append(rowChangedEvents, e)
}
}
return rowChangedEvents, needSplit, nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

for i := range updateEvent.Columns {
col := updateEvent.Columns[i]
preCol := updateEvent.PreColumns[i]
if col != nil && (col.Flag.IsUniqueKey() || col.Flag.IsHandleKey()) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this condition " col.Flag.IsHandleKey()) && preCol != nil && (preCol.Flag.IsUniqueKey()" mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we check if the column is a part of the handle key or a unique key, if so we should check if the value has been changed,
for the condition " col.Flag.IsHandleKey()) && preCol != nil && (preCol.Flag.IsUniqueKey()" I think it's safe to split the row because we can always split an update to delete + insert.

preCol != nil && (preCol.Flag.IsUniqueKey() || preCol.Flag.IsHandleKey()) {
colValueString := ColumnValueString(col.Value)
preColValueString := ColumnValueString(preCol.Value)
// If one unique key columns is updated, we need to split the event row.
if colValueString != preColValueString {
return true
}
}
}
return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(
updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error) {
if updateEvent == nil {
return nil, nil, errors.New("nil event cannot be split")
}

// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
// NOTICE: Here we don't need a full deep copy because
// our two events need Columns and PreColumns respectively,
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *updateEvent
deleteEvent.Columns = nil

insertEvent := *updateEvent
// NOTICE: clean up pre cols for insert event.
insertEvent.PreColumns = nil

return &deleteEvent, &insertEvent, nil
}

// Append adds a row changed event into SingleTableTxn
func (t *SingleTableTxn) Append(row *RowChangedEvent) {
if row.StartTs != t.StartTs || row.CommitTs != t.CommitTs || row.Table.TableID != t.Table.TableID {
Expand Down
167 changes: 167 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package model

import (
"sort"
"testing"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -396,3 +398,168 @@ func TestExchangeTablePartition(t *testing.T) {
require.Equal(t, "ALTER TABLE `test1`.`t1` EXCHANGE PARTITION `p0` WITH TABLE `test2`.`t2`", event.Query)
require.Equal(t, event.Type, timodel.ActionExchangeTablePartition)
}

func TestSortRowChangedEvent(t *testing.T) {
events := []*RowChangedEvent{
{
PreColumns: []*Column{{}},
Columns: []*Column{{}},
},
{
Columns: []*Column{{}},
},
{
PreColumns: []*Column{{}},
},
}
assert.True(t, events[0].IsUpdate())
assert.True(t, events[1].IsInsert())
assert.True(t, events[2].IsDelete())
sort.Sort(txnRows(events))
sdojjy marked this conversation as resolved.
Show resolved Hide resolved
assert.True(t, events[0].IsDelete())
assert.True(t, events[1].IsUpdate())
assert.True(t, events[2].IsInsert())
}

func TestTrySplitAndSortUpdateEventNil(t *testing.T) {
t.Parallel()

events := []*RowChangedEvent{nil}
result, split, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 0, len(result))
require.False(t, split)
}

func TestTrySplitAndSortUpdateEventEmpty(t *testing.T) {
t.Parallel()

events := []*RowChangedEvent{
{
StartTs: 1,
CommitTs: 2,
},
}
result, split, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 0, len(result))
require.False(t, split)
}

func TestTrySplitAndSortUpdateEvent(t *testing.T) {
t.Parallel()

// Update handle key.
columns := []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value-updated",
},
}
preColumns := []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value",
},
}

events := []*RowChangedEvent{
{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
},
}
result, split, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.True(t, split)

// Update unique key.
columns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: UniqueKeyFlag,
Value: "col2-value-updated",
},
}
preColumns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: UniqueKeyFlag,
Value: "col2-value",
},
}

events = []*RowChangedEvent{
{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
},
}
result, split, err = trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.True(t, split)

// Update non-handle key.
columns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value",
},
}
preColumns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value",
},
}

events = []*RowChangedEvent{
{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
},
}
result, split, err = trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 1, len(result))
require.False(t, split)
}
2 changes: 2 additions & 0 deletions cdc/sink/dmlsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated
TrySplitAndSortUpdateEvent() error
}

// CallbackFunc is the callback function for callbackable event.
Expand Down
Loading