diff --git a/dml_events.go b/dml_events.go index a260b378..c7e6ae3b 100644 --- a/dml_events.go +++ b/dml_events.go @@ -348,7 +348,13 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol switch v := value.(type) { case string: - return appendEscapedString(buffer, v) + var rightPadLengthForBinaryColumn uint + // see appendEscapedString() for details why we need special + // handling of BINARY column types + if column.Type == schema.TYPE_BINARY { + rightPadLengthForBinaryColumn = column.FixedSize + } + return appendEscapedString(buffer, v, rightPadLengthForBinaryColumn) case []byte: return appendEscapedBuffer(buffer, v, column.Type == schema.TYPE_JSON) case bool: @@ -362,7 +368,7 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol case float32: return strconv.AppendFloat(buffer, float64(v), 'g', -1, 64) case decimal.Decimal: - return appendEscapedString(buffer, v.String()) + return appendEscapedString(buffer, v.String(), 0) default: panic(fmt.Sprintf("unsupported type %t", value)) } @@ -406,10 +412,25 @@ func Int64Value(value interface{}) (int64, bool) { // // ref: https://github.com/mysql/mysql-server/blob/mysql-5.7.5/mysys/charset.c#L963-L1038 // ref: https://github.com/go-sql-driver/mysql/blob/9181e3a86a19bacd63e68d43ae8b7b36320d8092/utils.go#L717-L758 -func appendEscapedString(buffer []byte, value string) []byte { +// +// We need to support right-padding of the generated string using 0-bytes to +// mimic what a MySQL server would do for BINARY columns (with fixed length). +// +// ref: https://github.com/Shopify/ghostferry/pull/159 +// +// This is specifically mentioned in the the below link: +// +// When BINARY values are stored, they are right-padded with the pad value +// to the specified length. The pad value is 0x00 (the zero byte). Values +// are right-padded with 0x00 for inserts, and no trailing bytes are removed +// for retrievals. +// +// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html +func appendEscapedString(buffer []byte, value string, rightPadLengthForBinaryColumn uint) []byte { buffer = append(buffer, '\'') - for i := 0; i < len(value); i++ { + var i int + for i = 0; i < len(value); i++ { c := value[i] if c == '\'' { buffer = append(buffer, '\'', '\'') @@ -417,6 +438,11 @@ func appendEscapedString(buffer []byte, value string) []byte { buffer = append(buffer, c) } } + // continue 0-padding up to the desired length as provided by the + // caller + for ; i < int(rightPadLengthForBinaryColumn); i++ { + buffer = append(buffer, '\x00') + } return append(buffer, '\'') } diff --git a/test/integration/types_test.rb b/test/integration/types_test.rb index edda22e2..1c122e65 100644 --- a/test/integration/types_test.rb +++ b/test/integration/types_test.rb @@ -268,6 +268,47 @@ def test_escaped_data end end + def test_copy_data_in_fixed_size_binary_column + # NOTE: We explicitly test with a value that is shorter than the max column + # size - MySQL will 0-pad the value up the full length of the BINARY column, + # but the MySQL replication binlogs will *not* contain these 0-bytes. + # + # As a result, the binlog writer must explicitly add then when building + # update/delete statements, as the WHERE clause would not match existing + # rows in the target DB + inserted_data = "ABC" + execute_copy_data_in_fixed_size_binary_column( + column_size: 4, + inserted_data: inserted_data, + expected_inserted_data: "#{inserted_data}\x00", + updated_data: "EFGH" + ) + end + + def test_copy_data_in_fixed_size_binary_column__value_completely_filled + # NOTE: This test is interesting (beyond what is covered above already), + # because it seems the server strips the trailing 0-bytes before sending + # them to the binlog. + inserted_data = "ABC\x00" + execute_copy_data_in_fixed_size_binary_column( + column_size: 4, + inserted_data: inserted_data, + expected_inserted_data: inserted_data, + updated_data: "EFGH" + ) + end + + def test_copy_data_in_fixed_size_binary_column__length1 + # slight variation to cover the corner-case where there is no data in the + # column at all and the entire value is 0-padded (here, only 1 byte) + execute_copy_data_in_fixed_size_binary_column( + column_size: 1, + inserted_data: "", + expected_inserted_data: "\x00", + updated_data: "A" + ) + end + private def insert_json_on_source @@ -280,4 +321,64 @@ def insert_json_on_source source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FALSE}')") source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_NUMBER}')") end + + def execute_copy_data_in_fixed_size_binary_column(column_size:, inserted_data:, expected_inserted_data:, updated_data:) + # test for the BINARY columns needing 0-byte padding + # + # For details, see https://github.com/Shopify/ghostferry/pull/159 + + [source_db, target_db].each do |db| + db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}") + db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data BINARY(#{column_size}), primary key(id))") + end + + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, _binary'#{inserted_data}')") + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + row_copy_called = false + ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do + # select row from the target and then make sure the data with 0 padding + # is present. We do this to make sure there are no races in the test + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 1, res.count + res.each do |row| + assert_equal 1, row["id"] + assert_equal expected_inserted_data, row["data"] + end + + # now that the target is guaranteed to be in the same state as the + # source, trigger an update that will cause the binlog to stream an + # entry that needs the 0-byte padding + # + # NOTE: We could also do this via the hook + # + # Ghostferry::Status::BINLOG_STREAMING_STARTED + # + # but that puts us at risk of races within the test framework. + # See https://github.com/Shopify/ghostferry/issues/107 + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = _binary'#{updated_data}' WHERE id = 1") + + # NOTE: We move this flag to the end of the callback to make sure that + # we don't confuse ourselves if the callback crashes before completing + row_copy_called = true + end + + ghostferry.run + + # make sure the test framework called the expected hooks above - otherwise + # the test doesn't make much sense + assert row_copy_called + assert_test_table_is_identical + + # just being paranoid here: make sure the test outcome is as expected. It + # should be, since we made sure the tables have the same checksums, but it + # helps understand what the test code does + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 1, res.count + res.each do |row| + assert_equal 1, row["id"] + assert_equal updated_data, row["data"] + end + end end diff --git a/vendor/github.com/siddontang/go-mysql/schema/schema.go b/vendor/github.com/siddontang/go-mysql/schema/schema.go index cb3740ab..ebbe4218 100644 --- a/vendor/github.com/siddontang/go-mysql/schema/schema.go +++ b/vendor/github.com/siddontang/go-mysql/schema/schema.go @@ -7,6 +7,7 @@ package schema import ( "database/sql" "fmt" + "strconv" "strings" "github.com/juju/errors" @@ -29,6 +30,8 @@ const ( TYPE_TIME // time TYPE_BIT // bit TYPE_JSON // json + TYPE_BINARY // binary + TYPE_VARBINARY // varbinary ) type TableColumn struct { @@ -40,6 +43,7 @@ type TableColumn struct { IsUnsigned bool EnumValues []string SetValues []string + FixedSize uint } type Index struct { @@ -90,6 +94,11 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext ")"), "'", "", -1), ",") + } else if strings.HasPrefix(columnType, "binary") { + ta.Columns[index].Type = TYPE_BINARY + ta.Columns[index].FixedSize = getFixedSizeFromColumnType(columnType) + } else if strings.HasPrefix(columnType, "varbinary") { + ta.Columns[index].Type = TYPE_VARBINARY } else if strings.HasPrefix(columnType, "datetime") { ta.Columns[index].Type = TYPE_DATETIME } else if strings.HasPrefix(columnType, "timestamp") { @@ -104,6 +113,9 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext ta.Columns[index].Type = TYPE_JSON } else if strings.Contains(columnType, "int") || strings.HasPrefix(columnType, "year") { ta.Columns[index].Type = TYPE_NUMBER + } else if strings.HasPrefix(columnType, "char") { + ta.Columns[index].Type = TYPE_STRING + ta.Columns[index].FixedSize = getFixedSizeFromColumnType(columnType) } else { ta.Columns[index].Type = TYPE_STRING } @@ -118,6 +130,22 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext } } +func getFixedSizeFromColumnType(columnType string) uint { + startIndex := strings.Index(columnType, "(") + endIndex := strings.Index(columnType, ")") + if startIndex < 0 || endIndex < 0 || startIndex > endIndex { + return 0 + } + i, err := strconv.Atoi(columnType[startIndex+1:endIndex]) + if err != nil { + return 0 + } + if i < 0 { + return 0 + } + return uint(i) +} + func (ta *Table) FindColumn(name string) int { for i, col := range ta.Columns { if col.Name == name {