Skip to content

Commit

Permalink
fix csv update
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Sep 11, 2023
1 parent 963c08f commit 8043e4c
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 25 deletions.
15 changes: 13 additions & 2 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -771,10 +772,11 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
if len(t.Rows) < 2 {
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error {
if !t.shouldSplitTxn(sinkScheme) {
return nil
}

newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
Expand All @@ -783,6 +785,15 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
return nil
}

func (t *SingleTableTxn) shouldSplitTxn(sinkScheme string) bool {
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) {
return false
}
// For MQ or storage sink, we need to split the transaction with single row, since some
// protocols (such as csv and avro) do not support old value.
return true
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
Expand Down
20 changes: 0 additions & 20 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,26 +449,6 @@ func convertRowChangedEvents(
}

size += e.Row.ApproximateBytes()

// This indicates that it is an update event,
// and after enable old value internally by default(but disable in the configuration).
// We need to handle the update event to be compatible with the old format.
if e.Row.IsUpdate() && !enableOldValue {
if shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if err != nil {
return nil, 0, errors.Trace(err)
}
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
rowChangedEvents = append(rowChangedEvents, deleteEvent.Row, insertEvent.Row)
} else {
// If the handle key columns are not updated, PreColumns is directly ignored.
e.Row.PreColumns = nil
rowChangedEvents = append(rowChangedEvents, e.Row)
}
} else {
rowChangedEvents = append(rowChangedEvents, e.Row)
}
}
return rowChangedEvents, uint64(size), nil
}
Expand Down
5 changes: 5 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -59,6 +60,10 @@ func (m *mockSink) GetWriteTimes() int {
return m.writeTimes
}

func (m *mockSink) Scheme() string {
return sink.BlackHoleScheme
}

func (m *mockSink) Close() {}

func (m *mockSink) Dead() <-chan struct{} {
Expand Down
6 changes: 6 additions & 0 deletions cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/pkg/sink"
"go.uber.org/zap"
)

Expand All @@ -42,6 +43,11 @@ func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChange
return nil
}

// Scheme returns the sink scheme.
func (s *Sink) Scheme() string {
return sink.BlackHoleScheme
}

// Close do nothing.
func (s *Sink) Close() {}

Expand Down
8 changes: 8 additions & 0 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"context"
"math"
"net/url"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -65,6 +66,7 @@ type eventFragment struct {
// It will send the events to cloud storage systems.
type dmlSink struct {
changefeedID model.ChangeFeedID
scheme string
// last sequence number
lastSeqNum uint64
// encodingWorkers defines a group of workers for encoding events.
Expand Down Expand Up @@ -131,6 +133,7 @@ func NewCloudStorageSink(
wgCtx, wgCancel := context.WithCancel(ctx)
s := &dmlSink{
changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx),
scheme: strings.ToLower(sinkURI.Scheme),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, sink.TxnSink),
Expand Down Expand Up @@ -241,6 +244,11 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single
return nil
}

// Scheme returns the sink scheme.
func (s *dmlSink) Scheme() string {
return s.scheme
}

// Close closes the cloud storage sink.
func (s *dmlSink) Close() {
if s.cancel != nil {
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated
TrySplitAndSortUpdateEvent() error
TrySplitAndSortUpdateEvent(sinkScheme string) error
}

// CallbackFunc is the callback function for callbackable event.
Expand Down
2 changes: 2 additions & 0 deletions cdc/sinkv2/eventsink/event_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type EventSink[E TableEvent] interface {
// WriteEvents writes events to the sink.
// This is an asynchronously and thread-safe method.
WriteEvents(events ...*CallbackableEvent[E]) error
// Scheme returns the sink scheme.
Scheme() string
// Close closes the sink. Can be called with `WriteEvents` concurrently.
Close()
// The EventSink meets internal errors and has been dead already.
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewKafkaDMLSink(
return nil, errors.Trace(err)
}

s, err := newSink(ctx, p, topicManager, eventRouter, encoderConfig,
s, err := newSink(ctx, sinkURI, p, topicManager, eventRouter, encoderConfig,
replicaConfig.Sink.EncoderConcurrency, errCh)
if err != nil {
return nil, errors.Trace(err)
Expand Down
11 changes: 10 additions & 1 deletion cdc/sinkv2/eventsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package mq

import (
"context"
"net/url"
"strings"
"sync"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -42,7 +44,8 @@ var _ eventsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil)
// It will send the events to the MQ system.
type dmlSink struct {
// id indicates this sink belongs to which processor(changefeed).
id model.ChangeFeedID
id model.ChangeFeedID
scheme string
// protocol indicates the protocol used by this sink.
protocol config.Protocol

Expand All @@ -65,6 +68,7 @@ type dmlSink struct {
}

func newSink(ctx context.Context,
sinkURI *url.URL,
producer dmlproducer.DMLProducer,
topicManager manager.TopicManager,
eventRouter *dispatcher.EventRouter,
Expand All @@ -86,6 +90,7 @@ func newSink(ctx context.Context,

s := &dmlSink{
id: changefeedID,
scheme: strings.ToLower(sinkURI.Scheme),
protocol: encoderConfig.Protocol,
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -168,6 +173,10 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single
return nil
}

func (s *dmlSink) Scheme() string {
return s.scheme
}

// Close closes the sink.
func (s *dmlSink) Close() {
if s.cancel != nil {
Expand Down
8 changes: 8 additions & 0 deletions cdc/sinkv2/eventsink/txn/txn_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package txn
import (
"context"
"net/url"
"strings"
"sync"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -46,6 +47,7 @@ type sink struct {
conflictDetector *causality.ConflictDetector[*worker, *txnEvent]
isDead bool
}
scheme string

workers []*worker
cancel func()
Expand Down Expand Up @@ -84,6 +86,7 @@ func NewMySQLSink(
backends = append(backends, impl)
}
sink := newSink(ctx, backends, errCh, conflictDetectorSlots)
sink.scheme = strings.ToLower(sinkURI.Scheme)
sink.statistics = statistics
sink.cancel = cancel

Expand Down Expand Up @@ -151,6 +154,11 @@ func (s *sink) WriteEvents(txnEvents ...*eventsink.TxnCallbackableEvent) error {
return nil
}

// Scheme returns the sink scheme.
func (s *sink) Scheme() string {
return s.scheme
}

// Close closes the sink. It won't wait for all pending items backend handled.
func (s *sink) Close() {
if s.cancel != nil {
Expand Down
5 changes: 5 additions & 0 deletions cdc/sinkv2/tablesink/table_sink_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand All @@ -39,6 +40,10 @@ func (m *mockEventSink) WriteEvents(rows ...*eventsink.TxnCallbackableEvent) err
return nil
}

func (m *mockEventSink) Scheme() string {
return sink.BlackHoleScheme
}

func (m *mockEventSink) Close() {
close(m.dead)
}
Expand Down

0 comments on commit 8043e4c

Please sign in to comment.