Skip to content

Commit

Permalink
*: support vector data type (pingcap#748)
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 authored Jan 2, 2025
1 parent 11ba4e3 commit 5dc4f47
Show file tree
Hide file tree
Showing 16 changed files with 191 additions and 12 deletions.
3 changes: 3 additions & 0 deletions pkg/common/event/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ func formatColVal(datum types.Datum, col *model.ColumnInfo) (
}
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), warn, nil
case mysql.TypeTiDBVectorFloat32:
v := datum.GetVectorFloat32()
return v, v.Len(), "", nil
default:
// NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/event/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func unflatten(datum types.Datum, ft *types.FieldType, loc *time.Location) (type
byteSize := (ft.GetFlen() + 7) >> 3
datum.SetUint64(0)
datum.SetMysqlBit(types.NewBinaryLiteralFromUint(val, byteSize))
case mysql.TypeTiDBVectorFloat32:
datum.SetVectorFloat32(types.ZeroVectorFloat32)
return datum, nil
}
return datum, nil
}
3 changes: 3 additions & 0 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func FormatColVal(row *chunk.Row, col *model.ColumnInfo, idx int) (
b = 0
}
return b, nil
case mysql.TypeTiDBVectorFloat32:
b := row.GetVectorFloat32(idx).String()
return b, nil
default:
d := row.GetDatum(idx, &col.FieldType)
// NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail
Expand Down
10 changes: 10 additions & 0 deletions pkg/sink/codec/avro/arvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,11 @@ func (a *BatchEncoder) columnToAvroSchema(
Type: "int",
Parameters: map[string]string{tidbType: tt},
}, nil
case mysql.TypeTiDBVectorFloat32:
return avroSchema{
Type: "string",
Parameters: map[string]string{tidbType: tt},
}, nil
default:
log.Error("unknown mysql type", zap.Any("mysqlType", col.Type))
return nil, cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type")
Expand Down Expand Up @@ -973,6 +978,11 @@ func (a *BatchEncoder) columnToAvroData(
return int32(n), "int", nil
}
return int32(col.Value.(int64)), "int", nil
case mysql.TypeTiDBVectorFloat32:
if vec, ok := col.Value.(types.VectorFloat32); ok {
return vec.String(), "string", nil
}
return nil, "", cerror.ErrAvroEncodeFailed
default:
log.Error("unknown mysql type", zap.Any("value", col.Value), zap.Any("mysqlType", col.Type))
return nil, "", cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type")
Expand Down
5 changes: 4 additions & 1 deletion pkg/sink/codec/canal/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,10 @@ func formatColumnValue(row *chunk.Row, idx int, columnInfo *timodel.ColumnInfo,
yearValue := d.GetInt64()
value = strconv.FormatInt(yearValue, 10)
}

case mysql.TypeTiDBVectorFloat32:
javaType = internal.JavaSQLTypeVARCHAR
d := row.GetDatum(idx, &columnInfo.FieldType)
value = d.GetVectorFloat32().String()
default:
javaType = internal.JavaSQLTypeVARCHAR
d := row.GetDatum(idx, &columnInfo.FieldType)
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/codec/common/verify_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -218,6 +219,9 @@ func buildChecksumBytes(buf []byte, value interface{}, mysqlType byte) ([]byte,
// this should not happen, does not take into the checksum calculation.
case mysql.TypeNull, mysql.TypeGeometry:
// do nothing
case mysql.TypeTiDBVectorFloat32:
vec, _ := types.ParseVectorFloat32(value.(string))
buf = vec.SerializeTo(buf)
default:
return buf, errors.New("invalid type for the checksum calculation")
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sink/codec/craft/message_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
pmodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

Expand Down Expand Up @@ -353,6 +354,10 @@ func DecodeTiDBType(ty byte, flag common.ColumnFlagType, bits []byte) (interface
fallthrough
case mysql.TypeGeometry:
return nil, nil
case mysql.TypeTiDBVectorFloat32:
if val, err := types.ParseVectorFloat32(string(bits)); err != nil {
return val, nil
}
}
return nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/codec/craft/message_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tiflow/cdc/model"
)

Expand Down Expand Up @@ -222,6 +223,9 @@ func EncodeTiDBType(allocator *SliceAllocator, ty byte, flag common.ColumnFlagTy
fallthrough
case mysql.TypeGeometry:
return nil
case mysql.TypeTiDBVectorFloat32:
vec := value.(types.VectorFloat32)
return []byte(vec.String())
}
return nil
}
Expand Down
17 changes: 12 additions & 5 deletions pkg/sink/codec/debezium/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema(
writer.WriteIntField("version", 1)
writer.WriteStringField("field", col.Name)
})

case mysql.TypeTiDBVectorFloat32:
writer.WriteObjectElement(func() {
writer.WriteStringField("type", "string")
writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag()))
writer.WriteStringField("name", "io.debezium.data.TiDBVectorFloat32")
writer.WriteIntField("version", 1)
writer.WriteStringField("field", col.Name)
})
default:
log.Warn(
"meet unsupported field type",
Expand Down Expand Up @@ -501,10 +508,10 @@ func (c *dbzCodec) writeDebeziumFieldValue(
writer.WriteInt64Field(col.Name, int64(v))
return nil
}

// Note: Although Debezium's doc claims to use INT32 for INT, but it
// actually uses INT64. Debezium also uses INT32 for SMALLINT.
// So we only handle with TypeLonglong here.
case mysql.TypeTiDBVectorFloat32:
v := col.Value.(types.VectorFloat32).String()
writer.WriteStringField(col.Name, v)
return nil
}

writer.WriteAnyField(col.Name, col.Value)
Expand Down
5 changes: 4 additions & 1 deletion pkg/sink/codec/internal/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -61,6 +62,8 @@ func (c *Column) FromRowChangeColumn(col *common.Column) {
str = str[1 : len(str)-1]
}
c.Value = str
case mysql.TypeTiDBVectorFloat32:
c.Value = col.Value.(types.VectorFloat32).String()
default:
c.Value = col.Value
}
Expand Down Expand Up @@ -98,8 +101,8 @@ func (c *Column) ToRowChangeColumn(name string) *common.Column {
zap.Any("col", c), zap.Error(err))
}
col.Value = uint64(val)
case mysql.TypeTiDBVectorFloat32:
default:
col.Value = c.Value
}
return col
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sink/codec/internal/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func MySQLType2JavaType(mysqlType byte, isBinary bool) JavaSQLType {
case mysql.TypeJSON:
return JavaSQLTypeVARCHAR

case mysql.TypeTiDBVectorFloat32:
return JavaSQLTypeVARCHAR

default:
return JavaSQLTypeVARCHAR
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/sink/codec/open/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,16 @@ func writeColumnFieldValueIfUpdated(
writeFunc(func() { writer.WriteStringField("v", preValue.String()) })
}
}
case mysql.TypeTiDBVectorFloat32:
rowValue := row.GetVectorFloat32(idx)
preValue := preRow.GetVectorFloat32(idx)
if rowValue.Compare(preValue) != 0 {
if preValue.IsZeroValue() {
writeFunc(func() { writer.WriteNullField("v") })
} else {
writeFunc(func() { writer.WriteStringField("v", preValue.String()) })
}
}
default:
rowDatum := row.GetDatum(idx, &col.FieldType)
// NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail
Expand Down
26 changes: 22 additions & 4 deletions pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,8 @@ func (a *avroMarshaller) encodeValue4Avro(
return v, "double"
case string:
return v, "string"
case tiTypes.VectorFloat32:
return v.String(), "string"
default:
log.Panic("unexpected type for avro value", zap.Any("value", value))
}
Expand Down Expand Up @@ -718,6 +720,8 @@ func encodeValue(
} else {
result = string(v)
}
case tiTypes.VectorFloat32:
result = v.String()
default:
result = fmt.Sprintf("%v", v)
}
Expand Down Expand Up @@ -757,7 +761,18 @@ func decodeColumn(value interface{}, id int64, fieldType *types.FieldType) *comm
case int64:
value = uint64(v)
}
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeInt24, mysql.TypeYear:
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeInt24:
switch v := value.(type) {
case string:
if mysql.HasUnsignedFlag(fieldType.GetFlag()) {
value, err = strconv.ParseUint(v, 10, 64)
} else {
value, err = strconv.ParseInt(v, 10, 64)
}
default:
value = v
}
case mysql.TypeYear:
switch v := value.(type) {
case string:
value, err = strconv.ParseInt(v, 10, 64)
Expand All @@ -767,9 +782,10 @@ func decodeColumn(value interface{}, id int64, fieldType *types.FieldType) *comm
case mysql.TypeLonglong:
switch v := value.(type) {
case string:
value, err = strconv.ParseInt(v, 10, 64)
if err != nil {
if mysql.HasUnsignedFlag(fieldType.GetFlag()) {
value, err = strconv.ParseUint(v, 10, 64)
} else {
value, err = strconv.ParseInt(v, 10, 64)
}
case map[string]interface{}:
value = uint64(v["value"].(int64))
Expand All @@ -779,7 +795,9 @@ func decodeColumn(value interface{}, id int64, fieldType *types.FieldType) *comm
case mysql.TypeFloat:
switch v := value.(type) {
case string:
value, err = strconv.ParseFloat(v, 32)
var val float64
val, err = strconv.ParseFloat(v, 32)
value = float32(val)
default:
value = v
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ const (

// To limit memory usage for prepared statements.
prepStmtCacheSize int = 16 * 1024

defaultHasVectorType = false
)

type MysqlConfig struct {
Expand Down Expand Up @@ -113,6 +115,8 @@ type MysqlConfig struct {
// implement stmtCache to improve performance, especially when the downstream is TiDB
stmtCache *lru.Cache
MaxAllowedPacket int64

HasVectorType bool // HasVectorType is true if the column is vector type
}

// NewConfig returns the default mysql backend config.
Expand All @@ -132,6 +136,7 @@ func NewMysqlConfig() *MysqlConfig {
CachePrepStmts: defaultCachePrepStmts,
SourceID: config.DefaultTiDBSourceID,
DMLMaxRetry: 8,
HasVectorType: defaultHasVectorType,
}
}

Expand Down
64 changes: 64 additions & 0 deletions pkg/sink/mysql/format_ddl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"bytes"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tidb/pkg/parser/mysql"
"go.uber.org/zap"
)

type visiter struct{}

func (f *visiter) Enter(n ast.Node) (node ast.Node, skipChildren bool) {
switch v := n.(type) {
case *ast.ColumnDef:
if v.Tp != nil {
switch v.Tp.GetType() {
case mysql.TypeTiDBVectorFloat32:
v.Tp.SetType(mysql.TypeLongBlob)
v.Tp.SetCharset("")
v.Tp.SetCollate("")
v.Tp.SetFlen(-1)
v.Options = []*ast.ColumnOption{} // clear COMMENT
}
}
}
return n, false
}

func (f *visiter) Leave(n ast.Node) (node ast.Node, ok bool) {
return n, true
}

func formatQuery(sql string) string {
p := parser.New()
stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
log.Error("format query parse one stmt failed", zap.Error(err))
}
stmt.Accept(&visiter{})

buf := new(bytes.Buffer)
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf)
if err = stmt.Restore(restoreCtx); err != nil {
log.Error("format query restore failed", zap.Error(err))
}
return buf.String()
}
Loading

0 comments on commit 5dc4f47

Please sign in to comment.