Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): split RowChangeEvent if unique key is updated (#9437) #9559

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package model

import (
"fmt"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -47,6 +49,13 @@ const (
MessageTypeResolved
)

const (
// the RowChangedEvent order in the same transaction
typeDelete = iota + 1
typeUpdate
typeInsert
)

// ColumnFlagType is for encapsulating the flag operations for different flags.
type ColumnFlagType util.Flag

Expand Down Expand Up @@ -261,6 +270,11 @@ func (r *RedoLog) GetCommitTs() Ts {
return 0
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent() error {
return nil
}

// RedoRowChangedEvent represents the DML event used in RedoLog
type RedoRowChangedEvent struct {
Row *RowChangedEvent `msg:"row"`
Expand Down Expand Up @@ -332,11 +346,43 @@ type RowChangedEvent struct {
ReplicatingTs Ts `json:"-" msg:"-"`
}

// txnRows represents a set of events that belong to the same transaction.
type txnRows []*RowChangedEvent

// Len is the number of elements in the collection.
func (e txnRows) Len() int {
return len(e)
}

// Less sort the events base on the order of event type delete<update<insert
func (e txnRows) Less(i, j int) bool {
return getDMLOrder(e[i]) < getDMLOrder(e[j])
}

// getDMLOrder returns the order of the dml types: delete<update<insert
func getDMLOrder(event *RowChangedEvent) int {
if event.IsDelete() {
return typeDelete
} else if event.IsUpdate() {
return typeUpdate
}
return typeInsert
}

func (e txnRows) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}

// GetCommitTs returns the commit timestamp of this event.
func (r *RowChangedEvent) GetCommitTs() uint64 {
return r.CommitTs
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent() error {
return nil
}

// IsDelete returns true if the row is a delete event
func (r *RowChangedEvent) IsDelete() bool {
return len(r.PreColumns) != 0 && len(r.Columns) == 0
Expand Down Expand Up @@ -699,6 +745,112 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
return t.CommitTs
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
if len(t.Rows) < 2 {
return nil
}
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
}
t.Rows = newRows
return nil
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
events []*RowChangedEvent,
) ([]*RowChangedEvent, error) {
rowChangedEvents := make([]*RowChangedEvent, 0, len(events))
split := false
for _, e := range events {
if e == nil {
log.Warn("skip emit nil event",
zap.Any("event", e))
continue
}

colLen := len(e.Columns)
preColLen := len(e.PreColumns)
// Some transactions could generate empty row change event, such as
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event",
zap.Any("event", e))
continue
}

// This indicates that it is an update event. if the pk or uk is updated,
// we need to split it into two events (delete and insert).
if e.IsUpdate() && shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if err != nil {
return nil, errors.Trace(err)
}
split = true
rowChangedEvents = append(rowChangedEvents, deleteEvent, insertEvent)
} else {
rowChangedEvents = append(rowChangedEvents, e)
}
}
// some updated events is split, need to sort
if split {
sort.Sort(txnRows(rowChangedEvents))
}
return rowChangedEvents, nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

for i := range updateEvent.Columns {
col := updateEvent.Columns[i]
preCol := updateEvent.PreColumns[i]
if col != nil && (col.Flag.IsUniqueKey() || col.Flag.IsHandleKey()) &&
preCol != nil && (preCol.Flag.IsUniqueKey() || preCol.Flag.IsHandleKey()) {
colValueString := ColumnValueString(col.Value)
preColValueString := ColumnValueString(preCol.Value)
// If one unique key columns is updated, we need to split the event row.
if colValueString != preColValueString {
return true
}
}
}
return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(
updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error) {
if updateEvent == nil {
return nil, nil, errors.New("nil event cannot be split")
}

// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
// NOTICE: Here we don't need a full deep copy because
// our two events need Columns and PreColumns respectively,
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *updateEvent
deleteEvent.Columns = nil

insertEvent := *updateEvent
// NOTICE: clean up pre cols for insert event.
insertEvent.PreColumns = nil

return &deleteEvent, &insertEvent, nil
}

// Append adds a row changed event into SingleTableTxn
func (t *SingleTableTxn) Append(row *RowChangedEvent) {
if row.StartTs != t.StartTs || row.CommitTs != t.CommitTs || row.Table.TableID != t.Table.TableID {
Expand Down
167 changes: 167 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package model

import (
"sort"
"testing"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -396,3 +398,168 @@ func TestExchangeTablePartition(t *testing.T) {
require.Equal(t, "ALTER TABLE `test1`.`t1` EXCHANGE PARTITION `p0` WITH TABLE `test2`.`t2`", event.Query)
require.Equal(t, event.Type, timodel.ActionExchangeTablePartition)
}

func TestSortRowChangedEvent(t *testing.T) {
events := []*RowChangedEvent{
{
PreColumns: []*Column{{}},
Columns: []*Column{{}},
},
{
Columns: []*Column{{}},
},
{
PreColumns: []*Column{{}},
},
}
assert.True(t, events[0].IsUpdate())
assert.True(t, events[1].IsInsert())
assert.True(t, events[2].IsDelete())
sort.Sort(txnRows(events))
assert.True(t, events[0].IsDelete())
assert.True(t, events[1].IsUpdate())
assert.True(t, events[2].IsInsert())
}

func TestTrySplitAndSortUpdateEventNil(t *testing.T) {
t.Parallel()

events := []*RowChangedEvent{nil}
result, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 0, len(result))
}

func TestTrySplitAndSortUpdateEventEmpty(t *testing.T) {
t.Parallel()

events := []*RowChangedEvent{
{
StartTs: 1,
CommitTs: 2,
},
}
result, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 0, len(result))
}

func TestTrySplitAndSortUpdateEvent(t *testing.T) {
t.Parallel()

// Update handle key.
columns := []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value-updated",
},
}
preColumns := []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value",
},
}

events := []*RowChangedEvent{
{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
},
}
result, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.True(t, result[0].IsDelete())
require.True(t, result[1].IsInsert())

// Update unique key.
columns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: UniqueKeyFlag,
Value: "col2-value-updated",
},
}
preColumns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: UniqueKeyFlag,
Value: "col2-value",
},
}

events = []*RowChangedEvent{
{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
},
}
result, err = trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.True(t, result[0].IsDelete())
require.True(t, result[0].IsDelete())
require.True(t, result[1].IsInsert())

// Update non-handle key.
columns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value",
},
}
preColumns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value",
},
}

events = []*RowChangedEvent{
{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
},
}
result, err = trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 1, len(result))
}
2 changes: 2 additions & 0 deletions cdc/sink/dmlsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated
TrySplitAndSortUpdateEvent() error
}

// CallbackFunc is the callback function for callbackable event.
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func New(
if err != nil {
return nil, err
}
s.rowSink = mqs
s.txnSink = mqs
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewDMLSink(ctx, sinkURI, cfg, errCh)
if err != nil {
Expand Down
Loading