Skip to content

Commit

Permalink
config(ticdc): enable-old-value always false if using avro or csv as …
Browse files Browse the repository at this point in the history
…the encoding protocol (#9079)

close #9086
  • Loading branch information
3AceShowHand authored Jun 3, 2023
1 parent e1826b3 commit 6537ab8
Show file tree
Hide file tree
Showing 45 changed files with 595 additions and 404 deletions.
30 changes: 0 additions & 30 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/r3labs/diff"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -206,36 +205,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
if err != nil {
return nil, err
}
if !replicaCfg.EnableOldValue {
sinkURIParsed, err := url.Parse(cfg.SinkURI)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}

protocol := sinkURIParsed.Query().Get(config.ProtocolKey)
if protocol != "" {
replicaCfg.Sink.Protocol = util.AddressOf(protocol)
}
for _, fp := range config.ForceEnableOldValueProtocols {
if util.GetOrZero(replicaCfg.Sink.Protocol) == fp {
log.Warn(
"Attempting to replicate without old value enabled. "+
"CDC will enable old value and continue.",
zap.String(
"protocol",
util.GetOrZero(replicaCfg.Sink.Protocol),
),
)
replicaCfg.EnableOldValue = true
break
}
}

if replicaCfg.ForceReplicate {
return nil, cerror.ErrOldValueNotEnabled.GenWithStackByArgs(
"if use force replicate, old value feature must be enabled")
}
}
f, err := filter.NewFilter(replicaCfg, "")
if err != nil {
return nil, errors.Cause(err)
Expand Down
29 changes: 28 additions & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.ReplicaConfig = GetDefaultReplicaConfig()
cfg.ReplicaConfig.ForceReplicate = true
cfg.ReplicaConfig.EnableOldValue = false
// disable old value but force replicate
cfg.SinkURI = "mysql://"
// disable old value but force replicate, and using mysql sink.
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
require.Nil(t, cfInfo)
cfg.ReplicaConfig.ForceReplicate = false
cfg.ReplicaConfig.IgnoreIneligibleTable = true
cfg.SinkURI = "blackhole://"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, err)
require.NotNil(t, cfInfo)
Expand Down Expand Up @@ -89,6 +91,19 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.SinkURI = string([]byte{0x7f, ' '})
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)

cfg.StartTs = 0
// use blackhole to workaround
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
cfg.ReplicaConfig.EnableOldValue = true
cfg.ReplicaConfig.ForceReplicate = false
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NoError(t, err)
require.False(t, cfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}

func TestVerifyUpdateChangefeedConfig(t *testing.T) {
Expand Down Expand Up @@ -140,4 +155,16 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
cfg.TargetTs = 9
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)

cfg.StartTs = 0
cfg.TargetTs = 0
cfg.ReplicaConfig.EnableOldValue = true
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NoError(t, err)
require.False(t, newCfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}
33 changes: 7 additions & 26 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type Mounter interface {
type mounter struct {
schemaStorage SchemaStorage
tz *time.Location
enableOldValue bool
changefeedID model.ChangeFeedID
filter pfilter.Filter
metricTotalRows prometheus.Gauge
Expand All @@ -98,14 +97,12 @@ func NewMounter(schemaStorage SchemaStorage,
changefeedID model.ChangeFeedID,
tz *time.Location,
filter pfilter.Filter,
enableOldValue bool,
integrity *integrity.Config,
) Mounter {
return &mounter{
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
filter: filter,
schemaStorage: schemaStorage,
changefeedID: changefeedID,
filter: filter,
metricTotalRows: totalRowsCountGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricIgnoredDMLEventCounter: ignoredDMLEventCounter.
Expand Down Expand Up @@ -336,7 +333,7 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
}

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool,
tableInfo *model.TableInfo, datums map[int64]types.Datum,
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))
Expand All @@ -358,11 +355,6 @@ func datum2Column(
colName := colInfo.Name.O
colID := colInfo.ID
colDatums, exist := datums[colID]
if !exist && !fillWithDefaultValue {
log.Debug("column value is not found",
zap.String("table", tableInfo.Name.O), zap.String("column", colName))
continue
}

var (
colValue interface{}
Expand All @@ -372,7 +364,7 @@ func datum2Column(
)
if exist {
colValue, size, warn, err = formatColVal(colDatums, colInfo)
} else if fillWithDefaultValue {
} else {
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
Expand Down Expand Up @@ -511,7 +503,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand All @@ -532,17 +524,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
}
corrupted = true
}

// NOTICE: When the old Value feature is off,
// the Delete event only needs to keep the handle key column.
if row.Delete && !m.enableOldValue {
for i := range preCols {
col := preCols[i]
if col != nil && !col.Flag.IsHandleKey() {
preCols[i] = nil
}
}
}
}

var (
Expand All @@ -551,7 +532,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
current uint32
)
if row.RowExist {
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true)
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down
24 changes: 10 additions & 14 deletions cdc/entry/mounter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ type MounterGroup interface {
}

type mounterGroup struct {
schemaStorage SchemaStorage
inputCh chan *model.PolymorphicEvent
tz *time.Location
filter filter.Filter
enableOldValue bool
integrity *integrity.Config
schemaStorage SchemaStorage
inputCh chan *model.PolymorphicEvent
tz *time.Location
filter filter.Filter
integrity *integrity.Config

workerNum int

Expand All @@ -56,7 +55,6 @@ const (
func NewMounterGroup(
schemaStorage SchemaStorage,
workerNum int,
enableOldValue bool,
filter filter.Filter,
tz *time.Location,
changefeedID model.ChangeFeedID,
Expand All @@ -66,11 +64,10 @@ func NewMounterGroup(
workerNum = defaultMounterWorkerNum
}
return &mounterGroup{
schemaStorage: schemaStorage,
inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize),
enableOldValue: enableOldValue,
filter: filter,
tz: tz,
schemaStorage: schemaStorage,
inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize),
filter: filter,
tz: tz,

integrity: integrity,

Expand Down Expand Up @@ -111,8 +108,7 @@ func (m *mounterGroup) WaitForReady(_ context.Context) {}
func (m *mounterGroup) Close() {}

func (m *mounterGroup) runWorker(ctx context.Context) error {
mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter,
m.enableOldValue, m.integrity)
mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter, m.integrity)
for {
select {
case <-ctx.Done():
Expand Down
16 changes: 6 additions & 10 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
filter, err := filter.NewFilter(config, "")
require.Nil(t, err)
mounter := NewMounter(scheamStorage,
model.DefaultChangeFeedID("c1"),
time.UTC, filter, false,
config.Integrity).(*mounter)
model.DefaultChangeFeedID("c1"), time.UTC, filter, config.Integrity).(*mounter)
mounter.tz = time.Local
ctx := context.Background()

Expand Down Expand Up @@ -1039,8 +1037,7 @@ func TestDecodeRowEnableChecksum(t *testing.T) {
ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)

mounter := NewMounter(schemaStorage, changefeed, time.Local,
filter, true, replicaConfig.Integrity).(*mounter)
mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, replicaConfig.Integrity).(*mounter)

ctx := context.Background()

Expand Down Expand Up @@ -1121,7 +1118,7 @@ func TestDecodeRowEnableChecksum(t *testing.T) {
require.False(t, row.Checksum.Corrupted)

// hack the table info to make the checksum corrupted
tableInfo.Columns[0].ID = 3
tableInfo.Columns[0].FieldType = *types.NewFieldType(mysql.TypeVarchar)

// corrupt-handle-level default to warn, so no error, but the checksum is corrupted
row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV)
Expand Down Expand Up @@ -1170,8 +1167,7 @@ func TestDecodeRow(t *testing.T) {

schemaStorage.AdvanceResolvedTs(ver.Ver)

mounter := NewMounter(
schemaStorage, changefeed, time.Local, filter, true, cfg.Integrity).(*mounter)
mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, cfg.Integrity).(*mounter)

helper.Tk().MustExec(`insert into student values(1, "dongmen", 20, "male")`)
helper.Tk().MustExec(`update student set age = 27 where id = 1`)
Expand Down Expand Up @@ -1250,7 +1246,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) {

ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)
mounter := NewMounter(schemaStorage, cfID, time.Local, f, true, cfg.Integrity).(*mounter)
mounter := NewMounter(schemaStorage, cfID, time.Local, f, cfg.Integrity).(*mounter)

type testCase struct {
schema string
Expand Down Expand Up @@ -1427,7 +1423,7 @@ func TestBuildTableInfo(t *testing.T) {
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
Expand Down
25 changes: 22 additions & 3 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"net/url"
"regexp"
"strings"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -266,7 +267,7 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
// VerifyAndComplete verifies changefeed info and may fill in some fields.
// If a required field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fill in it.
func (info *ChangeFeedInfo) VerifyAndComplete() error {
func (info *ChangeFeedInfo) VerifyAndComplete() {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -292,8 +293,6 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error {
}

info.RmUnusedFields()

return nil
}

// RmUnusedFields removes unnecessary fields based on the downstream type and
Expand Down Expand Up @@ -390,6 +389,14 @@ func (info *ChangeFeedInfo) FixIncompatible() {
inheritV66 := creatorVersionGate.ChangefeedInheritSchedulerConfigFromV66()
info.fixScheduler(inheritV66)
log.Info("Fix incompatible scheduler completed", zap.String("changefeed", info.String()))

if creatorVersionGate.ChangefeedAdjustEnableOldValueByProtocol() {
log.Info("Start fixing incompatible enable old value", zap.String("changefeed", info.String()),
zap.Bool("enableOldValue", info.Config.EnableOldValue))
info.fixEnableOldValue()
log.Info("Fix incompatible enable old value completed", zap.String("changefeed", info.String()),
zap.Bool("enableOldValue", info.Config.EnableOldValue))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
Expand Down Expand Up @@ -460,6 +467,18 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
}
}

func (info *ChangeFeedInfo) fixEnableOldValue() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
// this is impossible to happen, since the changefeed registered successfully.
log.Warn("parse sink URI failed", zap.Error(err))
return
}
scheme := strings.ToLower(uri.Scheme)
protocol := uri.Query().Get(config.ProtocolKey)
info.Config.AdjustEnableOldValue(scheme, protocol)
}

func (info *ChangeFeedInfo) fixMQSinkProtocol() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
Expand Down
Loading

0 comments on commit 6537ab8

Please sign in to comment.