-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format #3658
[FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format #3658
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @SML0127's work! Just left some minor comments.
// The json string from binlog will remove useless space | ||
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}"); | ||
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\": \"value1\"}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This additional assignment was meant to fix the format discrepancy and could be removed now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this enhancement an optional configuration and defaults to the legacy format, considering some users may have relied on this specific JSON format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added useLegacyJsonFormat
option as a static variable in MySqlSourceConfig, and shared it with legcay MySqlSource for legacy MySqlSource user.
Also the default value of this option is true
, so users who want to continue using current format(legacy format) don't need to worry about this new option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some tests to verify if we have the same JSON format between snapshot and binlog stage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests for this new option. When useLegacyJsonFormat = flase
, it verifies that the JSON type data format is the same between snapshot stage and binlog stage.
Lines 498 to 551 in 337b5de
private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat) | |
throws Exception { | |
database.createAndInitialize(); | |
CloseableIterator<Event> iterator = | |
env.fromSource( | |
getFlinkSourceProvider( | |
new String[] {"json_types"}, | |
database, | |
useLegacyJsonFormat) | |
.getSource(), | |
WatermarkStrategy.noWatermarks(), | |
"Event-Source") | |
.executeAndCollect(); | |
Object[] expectedSnapshot = | |
new Object[] { | |
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), | |
BinaryStringData.fromString("{\"key1\": \"value1\"}"), | |
BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"), | |
BinaryStringData.fromString( | |
"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"), | |
1 | |
}; | |
// skip CreateTableEvent | |
List<Event> snapshotResults = | |
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; | |
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); | |
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES)) | |
.isEqualTo(expectedSnapshot); | |
try (Connection connection = database.getJdbcConnection(); | |
Statement statement = connection.createStatement()) { | |
statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;"); | |
} | |
Object[] expectedStreamRecord = expectedSnapshot; | |
if (useLegacyJsonFormat) { | |
expectedSnapshot[1] = BinaryStringData.fromString("{\"key1\":\"value1\"}"); | |
expectedSnapshot[2] = | |
BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}"); | |
expectedSnapshot[3] = | |
BinaryStringData.fromString( | |
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"); | |
} | |
expectedSnapshot[4] = null; | |
List<Event> streamResults = | |
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; | |
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after(); | |
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES)) | |
.isEqualTo(expectedStreamRecord); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (useLegacyJsonFormat) { | ||
sb.append("\":"); | ||
} else { | ||
sb.append("\": "); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we can choose JSON type data format (whitespace in before value)
if (useLegacyJsonFormat) { | ||
sb.append(","); | ||
} else { | ||
sb.append(", "); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we can choose JSON type data format (whitespace in after comma)
( | ||
id SERIAL, | ||
json_c0 JSON, | ||
json_c1 JSON, | ||
json_c2 JSON, | ||
int_c INTEGER, | ||
PRIMARY KEY (id) | ||
) DEFAULT CHARSET=utf8; | ||
|
||
INSERT INTO json_types | ||
VALUES (DEFAULT, | ||
'{"key1":"value1"}', | ||
'{"key1":"value1","key2":"value2"}', | ||
'[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]', | ||
1 | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test table for various JSON format data
public MySqlSourceConfigFactory useLegacyJsonFormat(boolean useLegacyJsonFormat) { | ||
this.useLegacyJsonFormat = useLegacyJsonFormat; | ||
return this; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setter for useLegacyJsonFormat
option
@Experimental | ||
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT = | ||
ConfigOptions.key("use.legacy.json.format") | ||
.booleanType() | ||
.defaultValue(true) | ||
.withDescription( | ||
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use.legacy.json.format
option for pipeline connector. default is true
.
@Experimental | ||
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT = | ||
ConfigOptions.key("use.legacy.json.format") | ||
.booleanType() | ||
.defaultValue(true) | ||
.withDescription( | ||
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use.legacy.json.format
option for source connector. default is true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @SML0127's nice work, looks good overall (I left some suggestions on test cases). Still need hearing from @ruanhang1993 on the changed configs.
/** | ||
* Copied from mysql-binlog-connector-java 0.27.2 to add whitespace before value and after comma. | ||
* | ||
* <p>Line 105: Added whitespace before value, Line 207: Added whitespace after comma |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems line numbers have been differed from actual changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accidental change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was my mistake...
if (useLegacyJsonFormat) { | ||
expectedSnapshot[1] = BinaryStringData.fromString("{\"key1\":\"value1\"}"); | ||
expectedSnapshot[2] = | ||
BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}"); | ||
expectedSnapshot[3] = | ||
BinaryStringData.fromString( | ||
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"); | ||
} | ||
expectedSnapshot[4] = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modifying expected data is a little confusing. Will something like
if (useLegacyJsonFormat) {
assertThat(RecordDataTestUtils.recordFields(...))
.containsExactly(
BinaryStringData.fromString("{\"key1\":\"value1\"}"),
BinaryStringData.fromString("{\"key2\":\"value2\"}"),
...
);
} else {
assertThat(RecordDataTestUtils.recordFields(...))
.containsExactly(...);
}
be clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I revised it more clearly as shown below.
if (useLegacyJsonFormat) {
// removed whitespace before value and after comma in json format string value
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
.containsExactly(
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
BinaryStringData.fromString("{\"key1\":\"value1\"}"),
BinaryStringData.fromString(
"{\"key1\":\"value1\",\"key2\":\"value2\"}"),
BinaryStringData.fromString(
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
null);
} else {
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
.containsExactly(expectedStreamRecord);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since DataStream / Table source connectors' behavior also changed, corresponding integrated test cases might be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add integrated test cases if necessary.
Could @SML0127 please rebase this to latest |
5dd1f51
to
a88388d
Compare
Sorry to make your rebase more painful, but seems there's still some conflicts between latest |
a88388d
to
01b7919
Compare
@yuxiqian |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…n type output between snapshot phase and binlog phase This closes apache#3658
01b7919
to
7f42d88
Compare
The MySQL related test job failed due to timeout, I append one commit to set a proper timeout for the most time-consuming step |
…n type output between snapshot phase and binlog phase This closes #3658
The function to convert json to string is different in snapshot step and binlog step.
This causes the following differences.
It is already known issue, but I think it is necessary to process json in the same way in snapshot step and binlog step.
So I modified and added
JsonStringFormatter
to flink-cdc to handle json in the same way as the result of snapshot step.jira issue: https://issues.apache.org/jira/browse/FLINK-36578