Skip to content

Commit

Permalink
Merge pull request go-mysql-org#482 from huangjunwen/master
Browse files Browse the repository at this point in the history
  • Loading branch information
IANTHEREAL authored Jun 22, 2020
2 parents 8250ec4 + 6b953b6 commit a8c16ae
Show file tree
Hide file tree
Showing 4 changed files with 516 additions and 1 deletion.
1 change: 1 addition & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {

b.cfg = cfg
b.parser = NewBinlogParser()
b.parser.SetFlavor(cfg.Flavor)
b.parser.SetRawMode(b.cfg.RawModeEnabled)
b.parser.SetParseTime(b.cfg.ParseTime)
b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)
Expand Down
11 changes: 10 additions & 1 deletion replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var (
)

type BinlogParser struct {
// "mysql" or "mariadb", if not set, use "mysql" by default
flavor string

format *FormatDescriptionEvent

tables map[uint64]*TableMapEvent
Expand Down Expand Up @@ -200,6 +203,10 @@ func (p *BinlogParser) SetVerifyChecksum(verify bool) {
p.verifyChecksum = verify
}

func (p *BinlogParser) SetFlavor(flavor string) {
p.flavor = flavor
}

func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
h := new(EventHeader)
err := h.Decode(data)
Expand Down Expand Up @@ -234,7 +241,9 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
case XID_EVENT:
e = &XIDEvent{}
case TABLE_MAP_EVENT:
te := &TableMapEvent{}
te := &TableMapEvent{
flavor: p.flavor,
}
if p.format.EventTypeHeaderLengths[TABLE_MAP_EVENT-1] == 6 {
te.tableIDSize = 4
} else {
Expand Down
324 changes: 324 additions & 0 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
var errMissingTableMapEvent = errors.New("invalid table id, no corresponding table map event")

type TableMapEvent struct {
flavor string
tableIDSize int

TableID uint64
Expand Down Expand Up @@ -423,6 +424,116 @@ func (e *TableMapEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "Enum/set default charset: %v\n", e.EnumSetDefaultCharset)
fmt.Fprintf(w, "Enum/set column charset: %v\n", e.EnumSetColumnCharset)

unsignedMap := e.UnsignedMap()
fmt.Fprintf(w, "UnsignedMap: %#v\n", unsignedMap)

collationMap := e.CollationMap()
fmt.Fprintf(w, "CollationMap: %#v\n", collationMap)

enumSetCollationMap := e.EnumSetCollationMap()
fmt.Fprintf(w, "EnumSetCollationMap: %#v\n", enumSetCollationMap)

enumStrValueMap := e.EnumStrValueMap()
fmt.Fprintf(w, "EnumStrValueMap: %#v\n", enumStrValueMap)

setStrValueMap := e.SetStrValueMap()
fmt.Fprintf(w, "SetStrValueMap: %#v\n", setStrValueMap)

geometryTypeMap := e.GeometryTypeMap()
fmt.Fprintf(w, "GeometryTypeMap: %#v\n", geometryTypeMap)

nameMaxLen := 0
for _, name := range e.ColumnName {
if len(name) > nameMaxLen {
nameMaxLen = len(name)
}
}
nameFmt := " %s"
if nameMaxLen > 0 {
nameFmt = fmt.Sprintf(" %%-%ds", nameMaxLen)
}

primaryKey := map[int]struct{}{}
for _, pk := range e.PrimaryKey {
primaryKey[int(pk)] = struct{}{}
}

fmt.Fprintf(w, "Columns: \n")
for i := 0; i < int(e.ColumnCount); i++ {
if len(e.ColumnName) == 0 {
fmt.Fprintf(w, nameFmt, "<n/a>")
} else {
fmt.Fprintf(w, nameFmt, e.ColumnName[i])
}

fmt.Fprintf(w, " type=%-3d", e.realType(i))

if e.IsNumericColumn(i) {
if len(unsignedMap) == 0 {
fmt.Fprintf(w, " unsigned=<n/a>")
} else if unsignedMap[i] {
fmt.Fprintf(w, " unsigned=yes")
} else {
fmt.Fprintf(w, " unsigned=no ")
}
}
if e.IsCharacterColumn(i) {
if len(collationMap) == 0 {
fmt.Fprintf(w, " collation=<n/a>")
} else {
fmt.Fprintf(w, " collation=%d ", collationMap[i])
}
}
if e.IsEnumColumn(i) {
if len(enumSetCollationMap) == 0 {
fmt.Fprintf(w, " enum_collation=<n/a>")
} else {
fmt.Fprintf(w, " enum_collation=%d", enumSetCollationMap[i])
}

if len(enumStrValueMap) == 0 {
fmt.Fprintf(w, " enum=<n/a>")
} else {
fmt.Fprintf(w, " enum=%v", enumStrValueMap[i])
}
}
if e.IsSetColumn(i) {
if len(enumSetCollationMap) == 0 {
fmt.Fprintf(w, " set_collation=<n/a>")
} else {
fmt.Fprintf(w, " set_collation=%d", enumSetCollationMap[i])
}

if len(setStrValueMap) == 0 {
fmt.Fprintf(w, " set=<n/a>")
} else {
fmt.Fprintf(w, " set=%v", setStrValueMap[i])
}
}
if e.IsGeometryColumn(i) {
if len(geometryTypeMap) == 0 {
fmt.Fprintf(w, " geometry_type=<n/a>")
} else {
fmt.Fprintf(w, " geometry_type=%v", geometryTypeMap[i])
}
}

available, nullable := e.Nullable(i)
if !available {
fmt.Fprintf(w, " null=<n/a>")
} else if nullable {
fmt.Fprintf(w, " null=yes")
} else {
fmt.Fprintf(w, " null=no ")
}

if _, ok := primaryKey[i]; ok {
fmt.Fprintf(w, " pri")
}

fmt.Fprintf(w, "\n")
}

fmt.Fprintln(w)
}

Expand Down Expand Up @@ -492,6 +603,219 @@ func (e *TableMapEvent) bytesSlice2StrSlice(src [][]byte) []string {
return ret
}

// UnsignedMap returns a map: column index -> unsigned.
// Note that only numeric columns will be returned.
// nil is returned if not available or no numeric columns at all.
func (e *TableMapEvent) UnsignedMap() map[int]bool {
if len(e.SignednessBitmap) == 0 {
return nil
}
p := 0
ret := make(map[int]bool)
for i := 0; i < int(e.ColumnCount); i++ {
if !e.IsNumericColumn(i) {
continue
}
ret[i] = e.SignednessBitmap[p/8]&(1<<uint(7-p%8)) != 0
p++
}
return ret
}

// CollationMap returns a map: column index -> collation id.
// Note that only character columns will be returned.
// nil is returned if not available or no character columns at all.
func (e *TableMapEvent) CollationMap() map[int]uint64 {
return e.collationMap(e.IsCharacterColumn, e.DefaultCharset, e.ColumnCharset)
}

// EnumSetCollationMap returns a map: column index -> collation id.
// Note that only enum or set columns will be returned.
// nil is returned if not available or no enum/set columns at all.
func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64 {
return e.collationMap(e.IsEnumOrSetColumn, e.EnumSetDefaultCharset, e.EnumSetColumnCharset)
}

func (e *TableMapEvent) collationMap(includeType func(int) bool, defaultCharset, columnCharset []uint64) map[int]uint64 {

if len(defaultCharset) != 0 {
defaultCollation := defaultCharset[0]

// character column index -> collation
collations := make(map[int]uint64)
for i := 1; i < len(defaultCharset); i += 2 {
collations[int(defaultCharset[i])] = defaultCharset[i+1]
}

p := 0
ret := make(map[int]uint64)
for i := 0; i < int(e.ColumnCount); i++ {
if !includeType(i) {
continue
}

if collation, ok := collations[p]; ok {
ret[i] = collation
} else {
ret[i] = defaultCollation
}
p++
}

return ret
}

if len(columnCharset) != 0 {

p := 0
ret := make(map[int]uint64)
for i := 0; i < int(e.ColumnCount); i++ {
if !includeType(i) {
continue
}

ret[i] = columnCharset[p]
p++
}

return ret
}

return nil
}

// EnumStrValueMap returns a map: column index -> enum string value.
// Note that only enum columns will be returned.
// nil is returned if not available or no enum columns at all.
func (e *TableMapEvent) EnumStrValueMap() map[int][]string {
return e.strValueMap(e.IsEnumColumn, e.EnumStrValueString())
}

// SetStrValueMap returns a map: column index -> set string value.
// Note that only set columns will be returned.
// nil is returned if not available or no set columns at all.
func (e *TableMapEvent) SetStrValueMap() map[int][]string {
return e.strValueMap(e.IsSetColumn, e.SetStrValueString())
}

func (e *TableMapEvent) strValueMap(includeType func(int) bool, strValue [][]string) map[int][]string {
if len(strValue) == 0 {
return nil
}
p := 0
ret := make(map[int][]string)
for i := 0; i < int(e.ColumnCount); i++ {
if !includeType(i) {
continue
}
ret[i] = strValue[p]
p++
}
return ret
}

// GeometryTypeMap returns a map: column index -> geometry type.
// Note that only geometry columns will be returned.
// nil is returned if not available or no geometry columns at all.
func (e *TableMapEvent) GeometryTypeMap() map[int]uint64 {
if len(e.GeometryType) == 0 {
return nil
}
p := 0
ret := make(map[int]uint64)
for i := 0; i < int(e.ColumnCount); i++ {
if !e.IsGeometryColumn(i) {
continue
}

ret[i] = e.GeometryType[p]
p++
}
return ret
}

// Below realType and IsXXXColumn are base from:
// table_def::type in sql/rpl_utility.h
// Table_map_log_event::print_columns in mysql-8.0/sql/log_event.cc and mariadb-10.5/sql/log_event_client.cc

func (e *TableMapEvent) realType(i int) byte {

typ := e.ColumnType[i]

switch typ {
case MYSQL_TYPE_STRING:
rtyp := byte(e.ColumnMeta[i] >> 8)
if rtyp == MYSQL_TYPE_ENUM || rtyp == MYSQL_TYPE_SET {
return rtyp
}

case MYSQL_TYPE_DATE:
return MYSQL_TYPE_NEWDATE
}

return typ
}

func (e *TableMapEvent) IsNumericColumn(i int) bool {

switch e.realType(i) {
case MYSQL_TYPE_TINY,
MYSQL_TYPE_SHORT,
MYSQL_TYPE_INT24,
MYSQL_TYPE_LONG,
MYSQL_TYPE_LONGLONG,
MYSQL_TYPE_NEWDECIMAL,
MYSQL_TYPE_FLOAT,
MYSQL_TYPE_DOUBLE:
return true

default:
return false
}

}

// IsCharacterColumn returns true if the column type is considered as character type.
// Note that JSON/GEOMETRY types are treated as character type in mariadb.
// (JSON is an alias for LONGTEXT in mariadb: https://mariadb.com/kb/en/json-data-type/)
func (e *TableMapEvent) IsCharacterColumn(i int) bool {

switch e.realType(i) {
case MYSQL_TYPE_STRING,
MYSQL_TYPE_VAR_STRING,
MYSQL_TYPE_VARCHAR,
MYSQL_TYPE_BLOB:
return true

case MYSQL_TYPE_GEOMETRY:
if e.flavor == "mariadb" {
return true
}
return false

default:
return false
}

}

func (e *TableMapEvent) IsEnumColumn(i int) bool {
return e.realType(i) == MYSQL_TYPE_ENUM
}

func (e *TableMapEvent) IsSetColumn(i int) bool {
return e.realType(i) == MYSQL_TYPE_SET
}

func (e *TableMapEvent) IsGeometryColumn(i int) bool {
return e.realType(i) == MYSQL_TYPE_GEOMETRY
}

func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool {
rtyp := e.realType(i)
return rtyp == MYSQL_TYPE_ENUM || rtyp == MYSQL_TYPE_SET
}

// RowsEventStmtEndFlag is set in the end of the statement.
const RowsEventStmtEndFlag = 0x01

Expand Down
Loading

0 comments on commit a8c16ae

Please sign in to comment.