Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-kafka-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Dec 29, 2021
2 parents 31edbc2 + 506e005 commit 0d6eff4
Show file tree
Hide file tree
Showing 35 changed files with 722 additions and 94 deletions.
2 changes: 1 addition & 1 deletion cdc/capture/http_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var httpBadRequestError = []*errors.Error{
cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC,
cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible,
cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError,
cerror.ErrMySQLInvalidConfig,
cerror.ErrMySQLInvalidConfig, cerror.ErrCaptureNotExist,
}

// IsHTTPBadRequestError check if a error is a http bad request error
Expand Down
2 changes: 2 additions & 0 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
status, exist := statuses[captureID]
if !exist {
_ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID))
return
}

positions, err := statusProvider.GetTaskPositions(ctx, changefeedID)
Expand All @@ -586,6 +587,7 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
position, exist := positions[captureID]
if !exist {
_ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID))
return
}

processorDetail := &model.ProcessorDetail{CheckPointTs: position.CheckPointTs, ResolvedTs: position.ResolvedTs, Error: position.Error}
Expand Down
57 changes: 44 additions & 13 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2330,16 +2330,56 @@ func (s *Scheduler) getNextLoadTaskTransfer(worker, source string) (string, stri
return "", ""
}

// hasLoadTaskByWorkerAndSource check whether there is a load subtask for the worker and source.
// hasLoadTaskByWorkerAndSource check whether there is an existing load subtask for the worker and source.
func (s *Scheduler) hasLoadTaskByWorkerAndSource(worker, source string) bool {
for _, sourceWorkerMap := range s.loadTasks {
if workerName, ok := sourceWorkerMap[source]; ok && workerName == worker {
for taskName, sourceWorkerMap := range s.loadTasks {
// don't consider removed subtask
subtasksV, ok := s.subTaskCfgs.Load(taskName)
if !ok {
continue
}
subtasks := subtasksV.(map[string]config.SubTaskConfig)
if _, ok2 := subtasks[source]; !ok2 {
continue
}

if workerName, ok2 := sourceWorkerMap[source]; ok2 && workerName == worker {
return true
}
}
return false
}

// TryResolveLoadTask checks if there are sources whose load task has local files and not bound to the worker which is
// accessible to the local files. If so, trigger a transfer source.
func (s *Scheduler) TryResolveLoadTask(sources []string) {
for _, source := range sources {
s.mu.Lock()
worker, ok := s.bounds[source]
if !ok {
s.mu.Unlock()
continue
}
if err := s.tryResolveLoadTask(worker.baseInfo.Name, source); err != nil {
s.logger.Error("tryResolveLoadTask failed", zap.Error(err))
}
s.mu.Unlock()
}
}

func (s *Scheduler) tryResolveLoadTask(originWorker, originSource string) error {
if s.hasLoadTaskByWorkerAndSource(originWorker, originSource) {
return nil
}

worker, source := s.getNextLoadTaskTransfer(originWorker, originSource)
if worker == "" && source == "" {
return nil
}

return s.transferWorkerAndSource(originWorker, originSource, worker, source)
}

func (s *Scheduler) handleLoadTaskDel(loadTask ha.LoadTask) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -2357,16 +2397,7 @@ func (s *Scheduler) handleLoadTaskDel(loadTask ha.LoadTask) error {
delete(s.loadTasks, loadTask.Task)
}

if s.hasLoadTaskByWorkerAndSource(originWorker, loadTask.Source) {
return nil
}

worker, source := s.getNextLoadTaskTransfer(originWorker, loadTask.Source)
if worker == "" && source == "" {
return nil
}

return s.transferWorkerAndSource(originWorker, loadTask.Source, worker, source)
return s.tryResolveLoadTask(originWorker, loadTask.Source)
}

func (s *Scheduler) handleLoadTaskPut(loadTask ha.LoadTask) {
Expand Down
12 changes: 12 additions & 0 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,13 @@ func (t *testScheduler) TestWatchLoadTask(c *C) {
s.workers[workerName4] = worker4
s.sourceCfgs[sourceID1] = &config.SourceConfig{}
s.sourceCfgs[sourceID2] = &config.SourceConfig{}
s.subTaskCfgs.Store(task1, map[string]config.SubTaskConfig{
sourceID1: {},
})
s.subTaskCfgs.Store(task2, map[string]config.SubTaskConfig{
sourceID1: {},
sourceID2: {},
})

worker1.ToFree()
c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil)
Expand Down Expand Up @@ -1726,6 +1733,11 @@ func (t *testScheduler) TestWatchLoadTask(c *C) {
c.Assert(s.bounds[sourceID2], DeepEquals, worker4)
c.Assert(worker2.stage, Equals, WorkerFree)

// after stop-task, hasLoadTaskByWorkerAndSource is no longer valid
c.Assert(s.hasLoadTaskByWorkerAndSource(workerName4, sourceID2), IsTrue)
s.subTaskCfgs.Delete(task2)
c.Assert(s.hasLoadTaskByWorkerAndSource(workerName4, sourceID2), IsFalse)

cancel1()
wg.Wait()
}
Expand Down
2 changes: 2 additions & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
release()
}

go s.scheduler.TryResolveLoadTask(sources)

resp.Result = true
if cfg.RemoveMeta {
resp.Msg = "`remove-meta` in task config is deprecated, please use `start-task ... --remove-meta` instead"
Expand Down
2 changes: 1 addition & 1 deletion dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
continue
}
// update finished offset after checkpoint updated
w.loader.finishedDataSize.Store(job.offset)
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
if _, ok := w.loader.dbTableDataFinishedSize[job.sourceSchema]; ok {
if _, ok := w.loader.dbTableDataFinishedSize[job.sourceSchema][job.sourceTable]; ok {
w.loader.dbTableDataFinishedSize[job.sourceSchema][job.sourceTable].Store(job.offset)
Expand Down
32 changes: 32 additions & 0 deletions dm/pkg/binlog/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
"github.com/go-mysql-org/go-mysql/replication"
"github.com/google/uuid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/mysql"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/simplifiedchinese"

"github.com/pingcap/tiflow/dm/pkg/terror"
)
Expand Down Expand Up @@ -376,6 +379,35 @@ func GetServerCollationByStatusVars(statusVars []byte, idAndCollationMap map[int
return idAndCollationMap[int(v)], err
}

// GetCharsetCodecByStatusVars returns an encoding.Encoding to encode and decode original query if needed.
func GetCharsetCodecByStatusVars(statusVars []byte) (encoding.Encoding, error) {
vars, err := statusVarsToKV(statusVars)
b, ok := vars[QCharsetCode]

if !ok {
if err == nil {
// only happen when this is a dummy event generated by DM
err = fmt.Errorf("Q_CHARSET_CODE not found in status_vars %v", statusVars)
}
return nil, err
}

// QCharsetCode 2-byte character_set_client + 2-byte collation_connection + 2-byte collation_server
r := bytes.NewReader(b)
var v uint16
_ = binary.Read(r, binary.LittleEndian, &v)

charsetName, _, err2 := charset.GetCharsetInfoByID(int(v))

// only handle GBK to minimize the change
switch charsetName {
case charset.CharsetGBK:
return simplifiedchinese.GBK, nil
default:
return nil, err2
}
}

// if returned error is `io.EOF`, it means UnexpectedEOF because we handled expected `io.EOF` as success
// returned map should not be nil for other usage.
func statusVarsToKV(statusVars []byte) (map[byte][]byte, error) {
Expand Down
19 changes: 19 additions & 0 deletions dm/pkg/parser/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package parser

import (
"bytes"
"strings"

"github.com/pingcap/tidb/parser/charset"

"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
Expand All @@ -35,6 +38,22 @@ const (
SingleRenameTableNameNum = 2
)

func init() {
c := &charset.Charset{
Name: charset.CharsetGBK,
DefaultCollation: "gbk_chinese_ci",
Collations: make(map[string]*charset.Collation),
Desc: "Chinese Internal Code Specification",
Maxlen: 2,
}
charset.AddCharset(c)
for _, coll := range charset.GetCollations() {
if strings.EqualFold(coll.CharsetName, c.Name) {
charset.AddCollation(coll)
}
}
}

// Parse wraps parser.Parse(), makes `parser` suitable for dm.
func Parse(p *parser.Parser, sql, charset, collation string) (stmt []ast.StmtNode, err error) {
stmts, warnings, err := p.Parse(sql, charset, collation)
Expand Down
22 changes: 0 additions & 22 deletions dm/pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/shopspring/decimal"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/log"
Expand Down Expand Up @@ -343,29 +342,8 @@ func NewSessionCtx(vars map[string]string) sessionctx.Context {

// AdjustBinaryProtocolForDatum converts the data in binlog to TiDB datum.
func AdjustBinaryProtocolForDatum(ctx sessionctx.Context, data []interface{}, cols []*model.ColumnInfo) ([]types.Datum, error) {
log.L().Debug("AdjustBinaryProtocolForChunk",
zap.Any("data", data),
zap.Any("columns", cols))
ret := make([]types.Datum, 0, len(data))
for i, d := range data {
switch v := d.(type) {
case int8:
d = int64(v)
case int16:
d = int64(v)
case int32:
d = int64(v)
case uint8:
d = uint64(v)
case uint16:
d = uint64(v)
case uint32:
d = uint64(v)
case uint:
d = uint64(v)
case decimal.Decimal:
d = v.String()
}
datum := types.NewDatum(d)
castDatum, err := table.CastValue(ctx, datum, cols[i], false, false)
if err != nil {
Expand Down
53 changes: 43 additions & 10 deletions dm/syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/shopspring/decimal"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
Expand Down Expand Up @@ -76,10 +78,41 @@ type genDMLParam struct {
extendData [][]interface{} // all data include extend data
}

func extractValueFromData(data []interface{}, columns []*model.ColumnInfo) []interface{} {
// extractValueFromData adjust the values obtained from go-mysql so that
// - the values can be correctly converted to TiDB datum
// - the values are in the correct type that go-sql-driver/mysql uses.
func extractValueFromData(data []interface{}, columns []*model.ColumnInfo, sourceTI *model.TableInfo) []interface{} {
value := make([]interface{}, 0, len(data))
for i := range data {
value = append(value, castUnsigned(data[i], &columns[i].FieldType))

for i, d := range data {
d = castUnsigned(d, &columns[i].FieldType)

switch v := d.(type) {
case int8:
d = int64(v)
case int16:
d = int64(v)
case int32:
d = int64(v)
case uint8:
d = uint64(v)
case uint16:
d = uint64(v)
case uint32:
d = uint64(v)
case uint:
d = uint64(v)
case decimal.Decimal:
d = v.String()
case string:
// convert string to []byte so that go-sql-driver/mysql can use _binary'value' for DML
if columns[i].Charset == charset.CharsetGBK {
d = []byte(v)
} else if columns[i].Charset == "" && sourceTI.Charset == charset.CharsetGBK {
d = []byte(v)
}
}
value = append(value, d)
}
return value
}
Expand Down Expand Up @@ -112,10 +145,10 @@ RowLoop:
return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(data))
}

value := extractValueFromData(data, columns)
value := extractValueFromData(data, columns, ti)
originalValue := value
if len(columns) != len(ti.Columns) {
originalValue = extractValueFromData(originalDataSeq[dataIdx], ti.Columns)
originalValue = extractValueFromData(originalDataSeq[dataIdx], ti.Columns, ti)
}

for _, expr := range filterExprs {
Expand Down Expand Up @@ -181,16 +214,16 @@ RowLoop:
return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(oldData))
}

oldValues := extractValueFromData(oldData, columns)
changedValues := extractValueFromData(changedData, columns)
oldValues := extractValueFromData(oldData, columns, ti)
changedValues := extractValueFromData(changedData, columns, ti)

var oriOldValues, oriChangedValues []interface{}
if len(columns) == len(ti.Columns) {
oriOldValues = oldValues
oriChangedValues = changedValues
} else {
oriOldValues = extractValueFromData(oriOldData, ti.Columns)
oriChangedValues = extractValueFromData(oriChangedData, ti.Columns)
oriOldValues = extractValueFromData(oriOldData, ti.Columns, ti)
oriChangedValues = extractValueFromData(oriChangedData, ti.Columns, ti)
}

for j := range oldValueFilters {
Expand Down Expand Up @@ -247,7 +280,7 @@ RowLoop:
return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(ti.Columns), len(data))
}

value := extractValueFromData(data, ti.Columns)
value := extractValueFromData(data, ti.Columns, ti)

for _, expr := range filterExprs {
skip, err := SkipDMLByExpression(s.sessCtx, value, expr, ti.Columns)
Expand Down
16 changes: 16 additions & 0 deletions dm/syncer/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"

. "github.com/pingcap/check"

"github.com/pingcap/tiflow/dm/pkg/schema"
"github.com/pingcap/tiflow/dm/pkg/utils"

Expand Down Expand Up @@ -631,3 +632,18 @@ func (s *testSyncerSuite) TestTruncateIndexValues(c *C) {
assert(realPreValue, DeepEquals, tc.preValues)
}
}

func (s *testSyncerSuite) TestGBKExtractValueFromData(c *C) {
table := `CREATE TABLE t (c INT PRIMARY KEY, d VARCHAR(20) CHARSET GBK);`
se := mock.NewContext()
p := parser.New()
stmt, _, err := p.Parse(table, "", "")
c.Assert(err, IsNil)
ti, err := tiddl.MockTableInfo(se, stmt[0].(*ast.CreateTableStmt), 6716)
c.Assert(err, IsNil)

row := []interface{}{1, "\xc4\xe3\xba\xc3"}
expect := []interface{}{1, []byte("\xc4\xe3\xba\xc3")}
got := extractValueFromData(row, ti.Columns, ti)
c.Assert(got, DeepEquals, expect)
}
Loading

0 comments on commit 0d6eff4

Please sign in to comment.