From 9957a20d13c3ca5196bf7218217941fbe02493c1 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 12 Sep 2023 10:33:29 +0800 Subject: [PATCH] fix comments --- cdc/model/sink.go | 10 ++++++---- cdc/processor/sinkmanager/redo_log_worker.go | 5 +---- cdc/processor/sinkmanager/table_sink_worker.go | 5 +---- cdc/processor/sinkmanager/table_sink_wrapper.go | 4 ++-- .../sinkmanager/table_sink_wrapper_test.go | 15 +++++---------- 5 files changed, 15 insertions(+), 24 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 765adc5dc8b..1796afd588d 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -773,7 +773,7 @@ func (t *SingleTableTxn) GetCommitTs() uint64 { // TrySplitAndSortUpdateEvent split update events if unique key is updated func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error { - if !t.shouldSplitTxn(sinkScheme) { + if !t.shouldSplitUpdateEvent(sinkScheme) { return nil } @@ -785,12 +785,14 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error { return nil } -func (t *SingleTableTxn) shouldSplitTxn(sinkScheme string) bool { +func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { + // For mysql sink, we do not split single-row transactions to restore + // upstream events as much as possible. if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) { return false } - // For MQ or storage sink, we need to split the transaction with single row, since some - // protocols (such as csv and avro) do not support old value. + // For MQ and storage sink, we need to split the transaction with single row, since some + // protocols (such as csv and avro) do not support old value in update event. return true } diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 5b674ba568c..45880238332 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -281,10 +281,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. e.Row.ReplicatingTs = task.tableSink.replicateTs - x, size, err = convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) - if err != nil { - return errors.Trace(err) - } + x, size = convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) usedMemSize += size rows = append(rows, x...) rowsSize += size diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 50609f8645c..3d0b7483684 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -381,10 +381,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. e.Row.ReplicatingTs = task.tableSink.replicateTs - x, size, err := convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) - if err != nil { - return err - } + x, size := convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) events = append(events, x...) allEventSize += size usedMem += size diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 845e8b67f24..01e5a221b02 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -442,7 +442,7 @@ func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint6 func convertRowChangedEvents( changefeed model.ChangeFeedID, tableID model.TableID, enableOldValue bool, events ...*model.PolymorphicEvent, -) ([]*model.RowChangedEvent, uint64, error) { +) ([]*model.RowChangedEvent, uint64) { size := 0 rowChangedEvents := make([]*model.RowChangedEvent, 0, len(events)) for _, e := range events { @@ -472,7 +472,7 @@ func convertRowChangedEvents( size += e.Row.ApproximateBytes() rowChangedEvents = append(rowChangedEvents, e.Row) } - return rowChangedEvents, uint64(size), nil + return rowChangedEvents, uint64(size) } func genReplicateTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 283d1455f2f..e1612e90b53 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -145,8 +145,7 @@ func TestConvertNilRowChangedEvents(t *testing.T) { changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) enableOldVlaue := false - result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldVlaue, events...) - require.NoError(t, err) + result, size := convertRowChangedEvents(changefeedID, tableID, enableOldVlaue, events...) require.Equal(t, 0, len(result)) require.Equal(t, uint64(0), size) } @@ -167,8 +166,7 @@ func TestConvertEmptyRowChangedEvents(t *testing.T) { changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) enableOldValue := false - result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) - require.NoError(t, err) + result, size := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) require.Equal(t, 0, len(result)) require.Equal(t, uint64(0), size) } @@ -219,8 +217,7 @@ func TestConvertRowChangedEventsWhenEnableOldValue(t *testing.T) { changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) enableOldValue := true - result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) - require.NoError(t, err) + result, size := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) } @@ -272,8 +269,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) { changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) enableOldValue := false - result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) - require.NoError(t, err) + result, size := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) @@ -318,8 +314,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) { }, }, } - result, size, err = convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) - require.NoError(t, err) + result, size = convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) }