Skip to content

Commit

Permalink
DBZ-8689 Provide config to override all date/datetime cols to nullable
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn authored and jpechane committed Feb 24, 2025
1 parent 38611c2 commit 10d8e6c
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,17 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ "'precise' represents values as precise (Java's 'BigDecimal') values;"
+ "'long' represents values using Java's 'long', which may not offer the precision but will be far easier to use in consumers.");

public static final Field OVERRIDE_DATETIME_TO_NULLABLE = Field.create("override.datetime.to.nullable")
.withDisplayName("Override datetime to nullable")
.withType(Type.BOOLEAN)
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("If enabled, makes all date & datetime columns nullable. Date & datetime types are incapable of representing zero-date values i.e., with" +
"month or day set to zero, e.g., 0000-00-00 or 0000-00-00 00:00:00. By overriding to nullable, the null value can be set in place" +
"of these zero-value temporal types. If disabled, zero-dates are converted to the epoch value (and cannot be differentiated from " +
"an actual epoch value)");

public static final Field TIME_PRECISION_MODE = RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 26))
Expand Down Expand Up @@ -486,6 +497,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
BINARY_HANDLING_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE,
OFFSET_STORAGE_PER_TASK,
OVERRIDE_DATETIME_TO_NULLABLE,
OFFSET_STORAGE_TASK_KEY_GEN,
PREV_NUM_TASKS,
EXCLUDE_EMPTY_SHARDS)
Expand Down Expand Up @@ -717,6 +729,10 @@ public boolean offsetStoragePerTask() {
return getConfig().getBoolean(OFFSET_STORAGE_PER_TASK);
}

public boolean overrideDatetimeToNullable() {
return getConfig().getBoolean(OVERRIDE_DATETIME_TO_NULLABLE);
}

public int getOffsetStorageTaskKeyGen() {
return getConfig().getInteger(OFFSET_STORAGE_TASK_KEY_GEN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public VitessDatabaseSchema(
ZoneOffset.UTC,
config.binaryHandlingMode(),
config.includeUnknownDatatypes(),
config.getBigIntUnsgnedHandlingMode()),
config.getBigIntUnsgnedHandlingMode(),
config.overrideDatetimeToNullable()),
schemaNameAdjuster,
config.customConverterRegistry(),
config.getSourceInfoStructMaker().schema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class VitessValueConverter extends JdbcValueConverters {
private static final BigDecimal BIGINT_CORRECTION = BIGINT_MAX_VALUE.add(BigDecimal.ONE);

private final boolean includeUnknownDatatypes;
private final boolean overrideDatetimeToNullable;
private final VitessConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode;

private static final Pattern DATE_FIELD_PATTERN = Pattern.compile("([0-9]*)-([0-9]*)-([0-9]*)");
Expand All @@ -54,9 +55,11 @@ public VitessValueConverter(
ZoneOffset defaultOffset,
BinaryHandlingMode binaryMode,
boolean includeUnknownDatatypes,
VitessConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode) {
VitessConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode,
boolean overrideDatetimeToNullable) {
super(decimalMode, temporalPrecisionMode, defaultOffset, null, null, binaryMode);
this.includeUnknownDatatypes = includeUnknownDatatypes;
this.overrideDatetimeToNullable = overrideDatetimeToNullable;
this.bigIntUnsignedHandlingMode = bigIntUnsignedHandlingMode;
}

Expand Down Expand Up @@ -96,15 +99,27 @@ public SchemaBuilder schemaBuilder(Column column) {
if (jdbcSchemaBuilder == null) {
return includeUnknownDatatypes ? SchemaBuilder.bytes() : null;
}
else {
return jdbcSchemaBuilder;
if (overrideDatetimeToNullable && isDateOrDateTime(typeName)) {
return jdbcSchemaBuilder.optional();
}
return jdbcSchemaBuilder;
}

public static boolean isDateOrDateTime(String typeName) {
return matches(typeName, Query.Type.DATETIME.name()) || matches(typeName, Query.Type.DATE.name());
}

// Convert Java value to Kafka Connect value.
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
String typeName = column.typeName().toUpperCase();
public ValueConverter converter(Column inputColumn, Field fieldDefn) {
String typeName = inputColumn.typeName().toUpperCase();
final Column column;
if (overrideDatetimeToNullable && isDateOrDateTime(typeName)) {
column = inputColumn.edit().optional(true).create();
}
else {
column = inputColumn;
}
if (matches(typeName, Query.Type.ENUM.name())) {
return (data) -> convertEnumToString(column.enumValues(), column, fieldDefn, data);
}
Expand Down Expand Up @@ -351,6 +366,15 @@ public static Duration stringToDuration(String timeString) {
return isNegative && !duration.isNegative() ? duration.negated() : duration;
}

/**
* Called for DATE type of MySQL. Converts the date to a LocalDate
*
* If the datetimeString cannot be represented by Timestamp, then it returns null.
* This happens if either month or day are equal to zero. Note: zero year can be represented by Timestamp.
*
* @param dateString The dateString to convert to a LocalDate
* @return LocalDate
*/
public static LocalDate stringToLocalDate(String dateString) {
final Matcher matcher = DATE_FIELD_PATTERN.matcher(dateString);
if (!matcher.matches()) {
Expand All @@ -361,14 +385,28 @@ public static LocalDate stringToLocalDate(String dateString) {
final int month = Integer.parseInt(matcher.group(2));
final int day = Integer.parseInt(matcher.group(3));

if (year == 0 || month == 0 || day == 0) {
if (month == 0 || day == 0) {
// year == 0 is valid and can be represented by LocalDate
INVALID_VALUE_LOGGER.warn("Invalid value '{}' stored in column converted to empty value", dateString);
return null;
}
return LocalDate.of(year, month, day);
}

/**
* Called for DATETIME type of MySQL. Converts the datetimeString to a Timestamp.
*
* If the datetimeString cannot be represented by Timestamp, then it returns null.
* This happens if either month or day are equal to zero. Note: zero year can be represented by Timestamp.
*
* @param datetimeString The String to convert
* @return The java.sql.Timestamp
*/
public static Timestamp stringToTimestamp(String datetimeString) {
if (datetimeString.matches("^\\d{4}-00-00.*$")) {
INVALID_VALUE_LOGGER.warn("Invalid value '{}' stored in column converted to null value", datetimeString);
return null;
}
return Timestamp.valueOf(datetimeString);
}
}
1 change: 1 addition & 0 deletions src/test/docker/local/common.cnf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sql-mode = ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ALLOW_INVALID_DATES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
2 changes: 1 addition & 1 deletion src/test/docker/local/scripts/mysqlctl-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ if [ -d $VTDATAROOT/$tablet_dir ]; then
action='start'
fi

mysqlctl \
EXTRA_MY_CNF="/vt/local/common.cnf" mysqlctl \
--log_dir $VTDATAROOT/tmp \
--tablet_uid $uid \
--mysql_port $mysql_port \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ public abstract class AbstractVitessConnectorTest extends AbstractAsyncEngineCon
protected static final String DATETIME = "2020-02-12 01:02:03";
protected static final String TIMESTAMP = "2020-02-13 01:02:03";

protected static final String ZERO_TIME = "00:00:00";
protected static final String ZERO_DATE = "0000-00-00";
protected static final String ZERO_DATETIME = "0000-00-00 00:00:00";
protected static final String ZERO_TIMESTAMP = "0000-00-00 00:00:00";
protected static final String ZERO_TIMESTAMP_PRECISION6 = ZERO_TIMESTAMP + ".000000";
protected static final String ZERO_YEAR = "0000";

protected static final String EPOCH_DATE = "1970-01-01";
protected static final String EPOCH_DATETIME = "1970-01-01 00:00:00";
protected static final String EPOCH_DATETIME_PRECISION4 = EPOCH_DATETIME + ".0000";
protected static final String EPOCH_TIMESTAMP = "1970-01-01 00:00:01"; // MySQL allowed lower bound is one second past the epoch
protected static final String EPOCH_TIMESTAMP_PRECISION6 = EPOCH_TIMESTAMP + ".000000";
protected static final String EPOCH_YEAR = "1970";

protected static final String YEAR = "2020";
protected static final String INSERT_TIME_TYPES_STMT = "INSERT INTO time_table ("
+ "time_col,"
Expand All @@ -153,6 +167,33 @@ public abstract class AbstractVitessConnectorTest extends AbstractAsyncEngineCon
+ "year_col)"
+ String.format(" VALUES ('%s', '%s', '%s', '%s', '%s')", TIME, DATE, DATETIME, TIMESTAMP, YEAR);

protected static final String INSERT_TIME_TYPES_ZERO_VALUE_STMT = "INSERT INTO time_table_zero_value ("
+ "time_col,"
+ "time_col4,"
+ "date_col,"
+ "datetime_col,"
+ "datetime_col4,"
+ "timestamp_col,"
+ "timestamp_col6,"
+ "year_col)"
+ String.format(" VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')",
ZERO_TIME, ZERO_TIME, ZERO_DATE, ZERO_DATETIME, ZERO_DATETIME, ZERO_TIMESTAMP, ZERO_TIMESTAMP, ZERO_YEAR);

protected static final String INSERT_TIME_TYPES_EPOCH_VALUE_STMT = "INSERT INTO time_table_zero_value ("
+ "time_col,"
+ "time_col4,"
+ "date_col,"
+ "datetime_col,"
+ "datetime_col4,"
+ "timestamp_col,"
+ "timestamp_col6,"
+ "year_col)"
+ String.format(" VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')",
ZERO_TIME, ZERO_TIME, EPOCH_DATE, EPOCH_DATETIME, EPOCH_DATETIME, EPOCH_TIMESTAMP, EPOCH_TIMESTAMP, EPOCH_YEAR);

protected static final String INSERT_TIME_TYPES_ZERO_VALUE_NULLABLE_STMT = INSERT_TIME_TYPES_ZERO_VALUE_STMT.replace(
"time_table_zero_value", "time_table_zero_value_nullable");

protected static final String TIME_PRECISION1 = TIME + ".1";
protected static final String TIME_PRECISION4 = TIME + ".1234";
protected static final String DATETIME_PRECISION2 = DATETIME + ".12";
Expand Down Expand Up @@ -360,6 +401,82 @@ protected List<SchemaAndValueField> schemasAndValuesForSetType() {
return fields;
}

protected List<SchemaAndValueField> schemasAndValuesForTimeTypeZeroDate() {
final List<SchemaAndValueField> fields = new ArrayList<>();
fields.addAll(
Arrays.asList(
new SchemaAndValueField("time_col", MicroTime.schema(), getDurationMicros(ZERO_TIME)),
new SchemaAndValueField("time_col4", MicroTime.schema(), getDurationMicros(ZERO_TIME)),
new SchemaAndValueField("date_col", io.debezium.time.Date.schema(), getDateIntDays(EPOCH_DATE)),
new SchemaAndValueField(
"datetime_col", Timestamp.schema(), getMillisForDatetime(EPOCH_DATETIME, 0)),
new SchemaAndValueField(
"datetime_col4", MicroTimestamp.schema(), getMicrosForDatetime(EPOCH_DATETIME_PRECISION4, 4)),
new SchemaAndValueField(
"timestamp_col", ZonedTimestamp.schema(), ZERO_TIMESTAMP),
new SchemaAndValueField(
"timestamp_col6", ZonedTimestamp.schema(), ZERO_TIMESTAMP_PRECISION6),
new SchemaAndValueField("year_col", Year.schema(), Integer.valueOf(ZERO_YEAR))));
return fields;
}

protected List<SchemaAndValueField> schemasAndValuesForTimeTypeZeroDateNullable() {
final List<SchemaAndValueField> fields = new ArrayList<>();
fields.addAll(
Arrays.asList(
new SchemaAndValueField("time_col", MicroTime.builder().optional().build(), getDurationMicros(ZERO_TIME)),
new SchemaAndValueField("time_col4", MicroTime.builder().optional().build(), getDurationMicros(ZERO_TIME)),
new SchemaAndValueField("date_col", io.debezium.time.Date.builder().optional().schema(), null),
new SchemaAndValueField(
"datetime_col", Timestamp.builder().optional().schema(), null),
new SchemaAndValueField(
"datetime_col4", MicroTimestamp.builder().optional().schema(), null),
new SchemaAndValueField(
"timestamp_col", ZonedTimestamp.builder().optional().schema(), ZERO_TIMESTAMP),
new SchemaAndValueField(
"timestamp_col6", ZonedTimestamp.builder().optional().schema(), ZERO_TIMESTAMP_PRECISION6),
new SchemaAndValueField("year_col", Year.builder().optional().schema(), Integer.valueOf(ZERO_YEAR))));
return fields;
}

protected List<SchemaAndValueField> schemasAndValuesForTimeTypeZeroDateToNull() {
final List<SchemaAndValueField> fields = new ArrayList<>();
fields.addAll(
Arrays.asList(
new SchemaAndValueField("time_col", MicroTime.schema(), getDurationMicros(ZERO_TIME)),
new SchemaAndValueField("time_col4", MicroTime.schema(), getDurationMicros(ZERO_TIME)),
new SchemaAndValueField("date_col", io.debezium.time.Date.builder().optional().schema(), null),
new SchemaAndValueField(
"datetime_col", Timestamp.builder().optional().schema(), null),
new SchemaAndValueField(
"datetime_col4", MicroTimestamp.builder().optional().schema(), null),
new SchemaAndValueField(
"timestamp_col", ZonedTimestamp.schema(), ZERO_TIMESTAMP),
new SchemaAndValueField(
"timestamp_col6", ZonedTimestamp.schema(), ZERO_TIMESTAMP_PRECISION6),
new SchemaAndValueField("year_col", Year.schema(), Integer.valueOf(ZERO_YEAR))));
return fields;
}

protected List<SchemaAndValueField> schemasAndValuesForTimeTypeTemporalToNullWithEpoch() {
final List<SchemaAndValueField> fields = new ArrayList<>();
fields.addAll(
Arrays.asList(
new SchemaAndValueField("time_col", MicroTime.schema(), getDurationMicros(ZERO_TIME)),
new SchemaAndValueField("time_col4", MicroTime.schema(), getDurationMicros(ZERO_TIME)),
new SchemaAndValueField("date_col", io.debezium.time.Date.builder().optional().schema(), getDateIntDays(EPOCH_DATE)),
new SchemaAndValueField(
"datetime_col", Timestamp.builder().optional().schema(), getMillisForDatetime(EPOCH_DATETIME, 0)),
new SchemaAndValueField(
"datetime_col4", MicroTimestamp.builder().optional().schema(), getMicrosForDatetime(EPOCH_DATETIME_PRECISION4, 4)),
new SchemaAndValueField(
"timestamp_col", ZonedTimestamp.schema(), EPOCH_TIMESTAMP),
new SchemaAndValueField(
"timestamp_col6", ZonedTimestamp.schema(), EPOCH_TIMESTAMP_PRECISION6),
new SchemaAndValueField("year_col", Year.schema(), Integer.valueOf(EPOCH_YEAR))));
return fields;
}

protected List<SchemaAndValueField> schemasAndValuesForTimeType() {
final List<SchemaAndValueField> fields = new ArrayList<>();
fields.addAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.data.Envelope;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
Expand Down Expand Up @@ -438,6 +437,60 @@ public void shouldReceiveChangesForInsertsWithTimestampTypes() throws Exception
assertInsert(INSERT_TIME_TYPES_STMT, schemasAndValuesForTimeType(), TestHelper.PK_FIELD);
}

@Test
public void shouldReceiveChangesForInsertsWithTimestampTypesZeroValues() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
startConnector();
assertConnectorIsRunning();

int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount);

consumer.expects(expectedRecordsCount);
assertInsert(INSERT_TIME_TYPES_ZERO_VALUE_STMT, schemasAndValuesForTimeTypeZeroDate(), TestHelper.PK_FIELD);
}

@Test
public void shouldReceiveChangesForInsertsWithTimestampTypesZeroValuesNullable() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
startConnector();
assertConnectorIsRunning();

int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount);

consumer.expects(expectedRecordsCount);
assertInsert(INSERT_TIME_TYPES_ZERO_VALUE_NULLABLE_STMT, schemasAndValuesForTimeTypeZeroDateNullable(), TestHelper.PK_FIELD);
}

@Test
public void shouldReceiveChangesForInsertsWithTimestampTypesZeroValueToNull() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
startConnector(config -> config.with(
VitessConnectorConfig.OVERRIDE_DATETIME_TO_NULLABLE, "true"), false);
assertConnectorIsRunning();

int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount);

consumer.expects(expectedRecordsCount);
assertInsert(INSERT_TIME_TYPES_ZERO_VALUE_STMT, schemasAndValuesForTimeTypeZeroDateToNull(), TestHelper.PK_FIELD);
}

@Test
public void shouldReceiveChangesForInsertsWithTimestampTypesZeroValueToNullWithEpoch() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
startConnector(config -> config.with(
VitessConnectorConfig.OVERRIDE_DATETIME_TO_NULLABLE, "true"), false);
assertConnectorIsRunning();

int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount);

consumer.expects(expectedRecordsCount);
assertInsert(INSERT_TIME_TYPES_EPOCH_VALUE_STMT, schemasAndValuesForTimeTypeTemporalToNullWithEpoch(), TestHelper.PK_FIELD);
}

@Test
public void shouldReceiveChangesForInsertsWithTimestampTypesConnect() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public void before() {
ZoneOffset.UTC,
config.binaryHandlingMode(),
config.includeUnknownDatatypes(),
config.getBigIntUnsgnedHandlingMode());
config.getBigIntUnsgnedHandlingMode(),
false);
schema = new VitessDatabaseSchema(
config,
SchemaNameAdjuster.create(),
Expand Down
Loading

0 comments on commit 10d8e6c

Please sign in to comment.