Skip to content

Commit

Permalink
[flink] Support updating row type to another row type in Flink (#4499)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Nov 12, 2024
1 parent f0e4cd7 commit 60f6611
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -560,7 +561,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> c
for (SchemaChange change : changes) {
if (change instanceof SchemaChange.AddColumn) {
SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn) change;
fieldNames.addAll(addColumn.fieldNames());
fieldNames.addAll(Arrays.asList(addColumn.fieldNames()));
} else if (change instanceof SchemaChange.RenameColumn) {
SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn) change;
fieldNames.add(rename.newName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
Expand Down Expand Up @@ -54,46 +52,45 @@ static SchemaChange addColumn(String fieldName, DataType dataType) {
}

static SchemaChange addColumn(String fieldName, DataType dataType, String comment) {
return new AddColumn(Collections.singletonList(fieldName), dataType, comment, null);
return new AddColumn(new String[] {fieldName}, dataType, comment, null);
}

static SchemaChange addColumn(String fieldName, DataType dataType, String comment, Move move) {
return new AddColumn(Collections.singletonList(fieldName), dataType, comment, move);
return new AddColumn(new String[] {fieldName}, dataType, comment, move);
}

static SchemaChange addColumn(
List<String> fieldNames, DataType dataType, String comment, Move move) {
String[] fieldNames, DataType dataType, String comment, Move move) {
return new AddColumn(fieldNames, dataType, comment, move);
}

static SchemaChange renameColumn(String fieldName, String newName) {
return new RenameColumn(Collections.singletonList(fieldName), newName);
return new RenameColumn(new String[] {fieldName}, newName);
}

static SchemaChange renameColumn(List<String> fieldNames, String newName) {
static SchemaChange renameColumn(String[] fieldNames, String newName) {
return new RenameColumn(fieldNames, newName);
}

static SchemaChange dropColumn(String fieldName) {
return new DropColumn(Collections.singletonList(fieldName));
return new DropColumn(new String[] {fieldName});
}

static SchemaChange dropColumn(List<String> fieldNames) {
static SchemaChange dropColumn(String[] fieldNames) {
return new DropColumn(fieldNames);
}

static SchemaChange updateColumnType(String fieldName, DataType newDataType) {
return new UpdateColumnType(Collections.singletonList(fieldName), newDataType, false);
return new UpdateColumnType(new String[] {fieldName}, newDataType, false);
}

static SchemaChange updateColumnType(
String fieldName, DataType newDataType, boolean keepNullability) {
return new UpdateColumnType(
Collections.singletonList(fieldName), newDataType, keepNullability);
return new UpdateColumnType(new String[] {fieldName}, newDataType, keepNullability);
}

static SchemaChange updateColumnType(
List<String> fieldNames, DataType newDataType, boolean keepNullability) {
String[] fieldNames, DataType newDataType, boolean keepNullability) {
return new UpdateColumnType(fieldNames, newDataType, keepNullability);
}

Expand Down Expand Up @@ -228,20 +225,19 @@ final class AddColumn implements SchemaChange {

private static final long serialVersionUID = 1L;

private final List<String> fieldNames;
private final String[] fieldNames;
private final DataType dataType;
private final String description;
private final Move move;

private AddColumn(
List<String> fieldNames, DataType dataType, String description, Move move) {
private AddColumn(String[] fieldNames, DataType dataType, String description, Move move) {
this.fieldNames = fieldNames;
this.dataType = dataType;
this.description = description;
this.move = move;
}

public List<String> fieldNames() {
public String[] fieldNames() {
return fieldNames;
}

Expand All @@ -268,7 +264,7 @@ public boolean equals(Object o) {
return false;
}
AddColumn addColumn = (AddColumn) o;
return Objects.equals(fieldNames, addColumn.fieldNames)
return Arrays.equals(fieldNames, addColumn.fieldNames)
&& dataType.equals(addColumn.dataType)
&& Objects.equals(description, addColumn.description)
&& move.equals(addColumn.move);
Expand All @@ -288,15 +284,15 @@ final class RenameColumn implements SchemaChange {

private static final long serialVersionUID = 1L;

private final List<String> fieldNames;
private final String[] fieldNames;
private final String newName;

private RenameColumn(List<String> fieldNames, String newName) {
private RenameColumn(String[] fieldNames, String newName) {
this.fieldNames = fieldNames;
this.newName = newName;
}

public List<String> fieldNames() {
public String[] fieldNames() {
return fieldNames;
}

Expand All @@ -313,7 +309,7 @@ public boolean equals(Object o) {
return false;
}
RenameColumn that = (RenameColumn) o;
return Objects.equals(fieldNames, that.fieldNames)
return Arrays.equals(fieldNames, that.fieldNames)
&& Objects.equals(newName, that.newName);
}

Expand All @@ -330,13 +326,13 @@ final class DropColumn implements SchemaChange {

private static final long serialVersionUID = 1L;

private final List<String> fieldNames;
private final String[] fieldNames;

private DropColumn(List<String> fieldNames) {
private DropColumn(String[] fieldNames) {
this.fieldNames = fieldNames;
}

public List<String> fieldNames() {
public String[] fieldNames() {
return fieldNames;
}

Expand All @@ -349,7 +345,7 @@ public boolean equals(Object o) {
return false;
}
DropColumn that = (DropColumn) o;
return Objects.equals(fieldNames, that.fieldNames);
return Arrays.equals(fieldNames, that.fieldNames);
}

@Override
Expand All @@ -363,19 +359,19 @@ final class UpdateColumnType implements SchemaChange {

private static final long serialVersionUID = 1L;

private final List<String> fieldNames;
private final String[] fieldNames;
private final DataType newDataType;
// If true, do not change the target field nullability
private final boolean keepNullability;

private UpdateColumnType(
List<String> fieldNames, DataType newDataType, boolean keepNullability) {
String[] fieldNames, DataType newDataType, boolean keepNullability) {
this.fieldNames = fieldNames;
this.newDataType = newDataType;
this.keepNullability = keepNullability;
}

public List<String> fieldNames() {
public String[] fieldNames() {
return fieldNames;
}

Expand All @@ -396,7 +392,7 @@ public boolean equals(Object o) {
return false;
}
UpdateColumnType that = (UpdateColumnType) o;
return Objects.equals(fieldNames, that.fieldNames)
return Arrays.equals(fieldNames, that.fieldNames)
&& newDataType.equals(that.newDataType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);

new NestedColumnModifier(addColumn.fieldNames().toArray(new String[0])) {
new NestedColumnModifier(addColumn.fieldNames()) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnAlreadyExistException {
Expand Down Expand Up @@ -320,7 +320,7 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
assertNotUpdatingPrimaryKeys(oldTableSchema, rename.fieldNames(), "rename");
new NestedColumnModifier(rename.fieldNames().toArray(new String[0])) {
new NestedColumnModifier(rename.fieldNames()) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnNotExistException,
Expand All @@ -347,7 +347,7 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
dropColumnValidation(oldTableSchema, drop);
new NestedColumnModifier(drop.fieldNames().toArray(new String[0])) {
new NestedColumnModifier(drop.fieldNames()) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnNotExistException {
Expand All @@ -364,7 +364,7 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
assertNotUpdatingPrimaryKeys(oldTableSchema, update.fieldNames(), "update");
updateNestedColumn(
newFields,
update.fieldNames().toArray(new String[0]),
update.fieldNames(),
(field) -> {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
Expand Down Expand Up @@ -558,8 +558,8 @@ private static List<String> applyNotNestedColumnRename(

Map<String, String> columnNames = Maps.newHashMap();
for (RenameColumn renameColumn : renames) {
if (renameColumn.fieldNames().size() == 1) {
columnNames.put(renameColumn.fieldNames().get(0), renameColumn.newName());
if (renameColumn.fieldNames().length == 1) {
columnNames.put(renameColumn.fieldNames()[0], renameColumn.newName());
}
}

Expand All @@ -571,10 +571,10 @@ private static List<String> applyNotNestedColumnRename(

private static void dropColumnValidation(TableSchema schema, DropColumn change) {
// primary keys and partition keys can't be nested columns
if (change.fieldNames().size() > 1) {
if (change.fieldNames().length > 1) {
return;
}
String columnToDrop = change.fieldNames().get(0);
String columnToDrop = change.fieldNames()[0];
if (schema.partitionKeys().contains(columnToDrop)
|| schema.primaryKeys().contains(columnToDrop)) {
throw new UnsupportedOperationException(
Expand All @@ -583,12 +583,12 @@ private static void dropColumnValidation(TableSchema schema, DropColumn change)
}

private static void assertNotUpdatingPrimaryKeys(
TableSchema schema, List<String> fieldNames, String operation) {
TableSchema schema, String[] fieldNames, String operation) {
// partition keys can't be nested columns
if (fieldNames.size() > 1) {
if (fieldNames.length > 1) {
return;
}
String columnToRename = fieldNames.get(0);
String columnToRename = fieldNames[0];
if (schema.partitionKeys().contains(columnToRename)) {
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public void testAddAndDropNestedColumns() throws Exception {

SchemaChange addColumn =
SchemaChange.addColumn(
Arrays.asList("v", "f2", "f3"),
new String[] {"v", "f2", "f3"},
DataTypes.STRING(),
"",
SchemaChange.Move.after("f3", "f1"));
Expand All @@ -579,11 +579,11 @@ public void testAddAndDropNestedColumns() throws Exception {
.hasMessageContaining("Column v.f2.f3 already exists");
SchemaChange middleColumnNotExistAddColumn =
SchemaChange.addColumn(
Arrays.asList("v", "invalid", "f4"), DataTypes.STRING(), "", null);
new String[] {"v", "invalid", "f4"}, DataTypes.STRING(), "", null);
assertThatCode(() -> manager.commitChanges(middleColumnNotExistAddColumn))
.hasMessageContaining("Column v.invalid does not exist");

SchemaChange dropColumn = SchemaChange.dropColumn(Arrays.asList("v", "f2", "f1"));
SchemaChange dropColumn = SchemaChange.dropColumn(new String[] {"v", "f2", "f1"});
manager.commitChanges(dropColumn);

innerType =
Expand All @@ -602,7 +602,7 @@ public void testAddAndDropNestedColumns() throws Exception {
assertThatCode(() -> manager.commitChanges(dropColumn))
.hasMessageContaining("Column v.f2.f1 does not exist");
SchemaChange middleColumnNotExistDropColumn =
SchemaChange.dropColumn(Arrays.asList("v", "invalid", "f2"));
SchemaChange.dropColumn(new String[] {"v", "invalid", "f2"});
assertThatCode(() -> manager.commitChanges(middleColumnNotExistDropColumn))
.hasMessageContaining("Column v.invalid does not exist");
}
Expand Down Expand Up @@ -632,7 +632,7 @@ public void testRenameNestedColumns() throws Exception {
manager.createTable(schema);

SchemaChange renameColumn =
SchemaChange.renameColumn(Arrays.asList("v", "f2", "f1"), "f100");
SchemaChange.renameColumn(new String[] {"v", "f2", "f1"}, "f100");
manager.commitChanges(renameColumn);

innerType =
Expand All @@ -649,17 +649,17 @@ public void testRenameNestedColumns() throws Exception {
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);

SchemaChange middleColumnNotExistRenameColumn =
SchemaChange.renameColumn(Arrays.asList("v", "invalid", "f2"), "f200");
SchemaChange.renameColumn(new String[] {"v", "invalid", "f2"}, "f200");
assertThatCode(() -> manager.commitChanges(middleColumnNotExistRenameColumn))
.hasMessageContaining("Column v.invalid does not exist");

SchemaChange lastColumnNotExistRenameColumn =
SchemaChange.renameColumn(Arrays.asList("v", "f2", "invalid"), "new_invalid");
SchemaChange.renameColumn(new String[] {"v", "f2", "invalid"}, "new_invalid");
assertThatCode(() -> manager.commitChanges(lastColumnNotExistRenameColumn))
.hasMessageContaining("Column v.f2.invalid does not exist");

SchemaChange newNameAlreadyExistRenameColumn =
SchemaChange.renameColumn(Arrays.asList("v", "f2", "f2"), "f100");
SchemaChange.renameColumn(new String[] {"v", "f2", "f2"}, "f100");
assertThatCode(() -> manager.commitChanges(newNameAlreadyExistRenameColumn))
.hasMessageContaining("Column v.f2.f100 already exists");
}
Expand Down Expand Up @@ -690,7 +690,7 @@ public void testUpdateNestedColumnType() throws Exception {

SchemaChange updateColumnType =
SchemaChange.updateColumnType(
Arrays.asList("v", "f2", "f1"), DataTypes.BIGINT(), true);
new String[] {"v", "f2", "f1"}, DataTypes.BIGINT(), true);
manager.commitChanges(updateColumnType);

innerType =
Expand All @@ -708,7 +708,7 @@ public void testUpdateNestedColumnType() throws Exception {

SchemaChange middleColumnNotExistUpdateColumnType =
SchemaChange.updateColumnType(
Arrays.asList("v", "invalid", "f1"), DataTypes.BIGINT(), true);
new String[] {"v", "invalid", "f1"}, DataTypes.BIGINT(), true);
assertThatCode(() -> manager.commitChanges(middleColumnNotExistUpdateColumnType))
.hasMessageContaining("Column v.invalid does not exist");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected void applySchemaChange(
SchemaChange.UpdateColumnType updateColumnType =
(SchemaChange.UpdateColumnType) schemaChange;
Preconditions.checkState(
updateColumnType.fieldNames().size() == 1,
updateColumnType.fieldNames().length == 1,
"Paimon CDC currently does not support nested type schema evolution.");
TableSchema schema =
schemaManager
Expand All @@ -110,11 +110,11 @@ protected void applySchemaChange(
() ->
new RuntimeException(
"Table does not exist. This is unexpected."));
int idx = schema.fieldNames().indexOf(updateColumnType.fieldNames().get(0));
int idx = schema.fieldNames().indexOf(updateColumnType.fieldNames()[0]);
Preconditions.checkState(
idx >= 0,
"Field name "
+ updateColumnType.fieldNames().get(0)
+ updateColumnType.fieldNames()[0]
+ " does not exist in table. This is unexpected.");
DataType oldType = schema.fields().get(idx).type();
DataType newType = updateColumnType.newDataType();
Expand All @@ -126,7 +126,7 @@ protected void applySchemaChange(
throw new UnsupportedOperationException(
String.format(
"Cannot convert field %s from type %s to %s of Paimon table %s.",
updateColumnType.fieldNames().get(0),
updateColumnType.fieldNames()[0],
oldType,
newType,
identifier.getFullName()));
Expand Down
Loading

0 comments on commit 60f6611

Please sign in to comment.