Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Sep 12, 2023
1 parent 7763eba commit 9957a20
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 24 deletions.
10 changes: 6 additions & 4 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error {
if !t.shouldSplitTxn(sinkScheme) {
if !t.shouldSplitUpdateEvent(sinkScheme) {
return nil
}

Expand All @@ -785,12 +785,14 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error {
return nil
}

func (t *SingleTableTxn) shouldSplitTxn(sinkScheme string) bool {
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
// For mysql sink, we do not split single-row transactions to restore
// upstream events as much as possible.
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) {
return false
}
// For MQ or storage sink, we need to split the transaction with single row, since some
// protocols (such as csv and avro) do not support old value.
// For MQ and storage sink, we need to split the transaction with single row, since some
// protocols (such as csv and avro) do not support old value in update event.
return true
}

Expand Down
5 changes: 1 addition & 4 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err = convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e)
if err != nil {
return errors.Trace(err)
}
x, size = convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e)
usedMemSize += size
rows = append(rows, x...)
rowsSize += size
Expand Down
5 changes: 1 addition & 4 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err := convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e)
if err != nil {
return err
}
x, size := convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e)
events = append(events, x...)
allEventSize += size
usedMem += size
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint6
func convertRowChangedEvents(
changefeed model.ChangeFeedID, tableID model.TableID, enableOldValue bool,
events ...*model.PolymorphicEvent,
) ([]*model.RowChangedEvent, uint64, error) {
) ([]*model.RowChangedEvent, uint64) {
size := 0
rowChangedEvents := make([]*model.RowChangedEvent, 0, len(events))
for _, e := range events {
Expand Down Expand Up @@ -472,7 +472,7 @@ func convertRowChangedEvents(
size += e.Row.ApproximateBytes()
rowChangedEvents = append(rowChangedEvents, e.Row)
}
return rowChangedEvents, uint64(size), nil
return rowChangedEvents, uint64(size)
}

func genReplicateTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) {
Expand Down
15 changes: 5 additions & 10 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ func TestConvertNilRowChangedEvents(t *testing.T) {
changefeedID := model.DefaultChangeFeedID("1")
tableID := model.TableID(1)
enableOldVlaue := false
result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldVlaue, events...)
require.NoError(t, err)
result, size := convertRowChangedEvents(changefeedID, tableID, enableOldVlaue, events...)
require.Equal(t, 0, len(result))
require.Equal(t, uint64(0), size)
}
Expand All @@ -167,8 +166,7 @@ func TestConvertEmptyRowChangedEvents(t *testing.T) {
changefeedID := model.DefaultChangeFeedID("1")
tableID := model.TableID(1)
enableOldValue := false
result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.NoError(t, err)
result, size := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.Equal(t, 0, len(result))
require.Equal(t, uint64(0), size)
}
Expand Down Expand Up @@ -219,8 +217,7 @@ func TestConvertRowChangedEventsWhenEnableOldValue(t *testing.T) {
changefeedID := model.DefaultChangeFeedID("1")
tableID := model.TableID(1)
enableOldValue := true
result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.NoError(t, err)
result, size := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.Equal(t, 1, len(result))
require.Equal(t, uint64(216), size)
}
Expand Down Expand Up @@ -272,8 +269,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) {
changefeedID := model.DefaultChangeFeedID("1")
tableID := model.TableID(1)
enableOldValue := false
result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.NoError(t, err)
result, size := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.Equal(t, 1, len(result))
require.Equal(t, uint64(216), size)

Expand Down Expand Up @@ -318,8 +314,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) {
},
},
}
result, size, err = convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.NoError(t, err)
result, size = convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.Equal(t, 1, len(result))
require.Equal(t, uint64(216), size)
}
Expand Down

0 comments on commit 9957a20

Please sign in to comment.