Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1661] feat(core): introduce catalog capability framework #2819

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.datastrato.gravitino.connector.BaseCatalog;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.TableCatalog;
import java.util.Map;
Expand Down Expand Up @@ -37,6 +38,11 @@ protected CatalogOperations newOps(Map<String, String> config) {
return ops;
}

@Override
public Capability newCapability() {
return new HiveCatalogCapability();
}

/**
* Returns the Hive catalog operations as a {@link SupportsSchemas}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.hive;

import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.connector.capability.CapabilityResult;

public class HiveCatalogCapability implements Capability {
@Override
public CapabilityResult columnNotNull() {
// The NOT NULL constraint for column is supported since Hive3.0, see
// https://issues.apache.org/jira/browse/HIVE-16575
return CapabilityResult.unsupported(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,6 @@ private void validateColumnChangeForAlter(
|| !partitionFields.contains(fieldToAdd),
"Cannot alter partition column: " + fieldToAdd);

if (c instanceof TableChange.UpdateColumnNullability) {
throw new IllegalArgumentException(
"Hive does not support altering column nullability");
}

if (c instanceof TableChange.UpdateColumnDefaultValue) {
throw new IllegalArgumentException(
"Hive does not support altering column default value");
Expand Down Expand Up @@ -690,7 +685,6 @@ public Table createTable(
Arrays.stream(columns)
.forEach(
c -> {
validateNullable(c.name(), c.nullable());
validateColumnDefaultValue(c.name(), c.defaultValue());
});

Expand Down Expand Up @@ -791,7 +785,6 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
String fieldName = String.join(".", addColumn.fieldName());
validateNullable(fieldName, addColumn.isNullable());
validateColumnDefaultValue(fieldName, addColumn.getDefaultValue());
doAddColumn(cols, addColumn);

Expand Down Expand Up @@ -872,17 +865,6 @@ private void validateColumnDefaultValue(String fieldName, Expression defaultValu
}
}

private void validateNullable(String fieldName, boolean nullable) {
// The NOT NULL constraint for column is supported since Hive3.0, see
// https://issues.apache.org/jira/browse/HIVE-16575
if (!nullable) {
throw new IllegalArgumentException(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x. Illegal column: "
+ fieldName);
}
}

private int columnPosition(List<FieldSchema> columns, TableChange.ColumnPosition position) {
Preconditions.checkArgument(position != null, "Column position cannot be null");
if (position instanceof TableChange.After) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,33 +202,6 @@ public void testCreateHiveTable() {
sortOrders));
Assertions.assertTrue(exception.getMessage().contains("Table already exists"));

HiveColumn illegalColumn =
HiveColumn.builder()
.withName("col_3")
.withType(Types.ByteType.get())
.withComment(HIVE_COMMENT)
.withNullable(false)
.build();

exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() ->
tableCatalog.createTable(
tableIdentifier,
new Column[] {illegalColumn},
HIVE_COMMENT,
properties,
new Transform[0],
distribution,
sortOrders));
Assertions.assertTrue(
exception
.getMessage()
.contains(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x"));

HiveColumn withDefault =
HiveColumn.builder()
.withName("col_3")
Expand Down Expand Up @@ -455,21 +428,6 @@ public void testAlterHiveTable() {
() -> tableCatalog.alterTable(tableIdentifier, tableChange3));
Assertions.assertTrue(exception.getMessage().contains("Column position cannot be null"));

TableChange.ColumnPosition first = TableChange.ColumnPosition.first();
TableChange tableChange4 =
TableChange.addColumn(new String[] {"col_3"}, Types.ByteType.get(), null, first, false);

exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> tableCatalog.alterTable(tableIdentifier, tableChange4));
Assertions.assertTrue(
exception
.getMessage()
.contains(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x"));

TableChange.ColumnPosition pos = TableChange.ColumnPosition.after(col2.name());
TableChange tableChange5 =
TableChange.addColumn(new String[] {"col_3"}, Types.ByteType.get(), pos);
Expand All @@ -489,14 +447,6 @@ public void testAlterHiveTable() {
() -> tableCatalog.alterTable(tableIdentifier, tableChange6));
Assertions.assertTrue(exception.getMessage().contains("Cannot add column with duplicate name"));

TableChange tableChange7 = TableChange.updateColumnNullability(new String[] {"col_1"}, false);
exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> tableCatalog.alterTable(tableIdentifier, tableChange7));
Assertions.assertEquals(
"Hive does not support altering column nullability", exception.getMessage());

TableChange tableChange8 =
TableChange.addColumn(
new String[] {"col_3"}, Types.ByteType.get(), "comment", Literals.NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,28 @@ public void testCreateHiveTable() throws TException, InterruptedException {
Assertions.assertEquals(properties.get(key), hiveTable1.getParameters().get(key)));
assertTableEquals(createdTable1, hiveTable1);
checkTableReadWrite(hiveTable1);

// test column not null
Column illegalColumn =
Column.of("not_null_column", Types.StringType.get(), "not null column", false, false, null);
IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() ->
catalog
.asTableCatalog()
.createTable(
nameIdentifier,
new Column[] {illegalColumn},
TABLE_COMMENT,
properties,
Transforms.EMPTY_TRANSFORM));
Assertions.assertTrue(
exception
.getMessage()
.contains(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x"));
}

@Test
Expand Down Expand Up @@ -1098,6 +1120,20 @@ public void testAlterHiveTable() throws TException, InterruptedException {
+ "but the current Gravitino Hive catalog only supports Hive 2.x"),
"The exception message is: " + exception.getMessage());

// test alter column nullability exception
TableChange alterColumnNullability =
TableChange.updateColumnNullability(new String[] {HIVE_COL_NAME1}, false);
exception =
assertThrows(
IllegalArgumentException.class,
() -> tableCatalog.alterTable(id, alterColumnNullability));
Assertions.assertTrue(
exception
.getMessage()
.contains(
"The NOT NULL constraint for column is only supported since Hive 3.0,"
+ " but the current Gravitino Hive catalog only supports Hive 2.x. Illegal column: hive_col_name1"));

// test updateColumnPosition exception
Column col1 = Column.of("name", Types.StringType.get(), "comment");
Column col2 = Column.of("address", Types.StringType.get(), "comment");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog;

import static com.datastrato.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;

import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableChange;
import com.google.common.base.Preconditions;
import java.util.Arrays;

public class CapabilityHelpers {

public static Column[] applyCapabilities(Column[] columns, Capability capabilities) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just one capability, why do you need to use plural?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will apply multiple capabilities to the column, such as default value, not null, case sensitivity, and so on. Therefore I use plural

return Arrays.stream(columns)
.map(c -> applyCapabilities(c, capabilities))
.toArray(Column[]::new);
}

public static TableChange[] applyCapabilities(Capability capabilities, TableChange... changes) {
return Arrays.stream(changes)
.map(
change -> {
if (change instanceof TableChange.AddColumn) {
return applyCapabilities((TableChange.AddColumn) change, capabilities);

} else if (change instanceof TableChange.UpdateColumnNullability) {
return applyCapabilities(
(TableChange.UpdateColumnNullability) change, capabilities);
}
return change;
})
.toArray(TableChange[]::new);
}

private static TableChange applyCapabilities(
TableChange.AddColumn addColumn, Capability capabilities) {
Column appliedColumn =
applyCapabilities(
Column.of(
addColumn.fieldName()[0],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why getting the first element of the fieldName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the first name is the column name, and the others are just nested fields in columns.

addColumn.getDataType(),
addColumn.getComment(),
addColumn.isNullable(),
addColumn.isAutoIncrement(),
addColumn.getDefaultValue()),
capabilities);

return TableChange.addColumn(
applyCaseSensitiveOnColumnName(addColumn.fieldName(), capabilities),
appliedColumn.dataType(),
appliedColumn.comment(),
addColumn.getPosition(),
appliedColumn.nullable(),
appliedColumn.autoIncrement(),
appliedColumn.defaultValue());
}

private static TableChange applyCapabilities(
TableChange.UpdateColumnNullability updateColumnNullability, Capability capabilities) {

applyColumnNotNull(
String.join(".", updateColumnNullability.fieldName()),
updateColumnNullability.nullable(),
capabilities);

return TableChange.updateColumnNullability(
applyCaseSensitiveOnColumnName(updateColumnNullability.fieldName(), capabilities),
updateColumnNullability.nullable());
}

private static Column applyCapabilities(Column column, Capability capabilities) {
applyColumnNotNull(column, capabilities);
applyColumnDefaultValue(column, capabilities);
applyNameSpecification(Capability.Scope.COLUMN, column.name(), capabilities);

return Column.of(
applyCaseSensitiveOnName(Capability.Scope.COLUMN, column.name(), capabilities),
column.dataType(),
column.comment(),
column.nullable(),
column.autoIncrement(),
column.defaultValue());
}

private static String applyCaseSensitiveOnName(
Capability.Scope scope, String name, Capability capabilities) {
return capabilities.caseSensitiveOnName(scope).supported() ? name : name.toLowerCase();
}

private static String[] applyCaseSensitiveOnColumnName(String[] name, Capability capabilities) {
if (!capabilities.caseSensitiveOnName(Capability.Scope.COLUMN).supported()) {
String[] standardizeColumnName = Arrays.copyOf(name, name.length);
standardizeColumnName[0] = name[0].toLowerCase();
return standardizeColumnName;
}
return name;
}

private static void applyColumnNotNull(Column column, Capability capabilities) {
applyColumnNotNull(column.name(), column.nullable(), capabilities);
}

private static void applyColumnNotNull(
String columnName, boolean nullable, Capability capabilities) {
Preconditions.checkArgument(
capabilities.columnNotNull().supported() || nullable,
capabilities.columnNotNull().unsupportedMessage() + " Illegal column: " + columnName);
}

private static void applyColumnDefaultValue(Column column, Capability capabilities) {
Preconditions.checkArgument(
capabilities.columnDefaultValue().supported()
|| DEFAULT_VALUE_NOT_SET.equals(column.defaultValue()),
capabilities.columnDefaultValue().unsupportedMessage()
+ " Illegal column: "
+ column.name());
}

private static void applyNameSpecification(
Capability.Scope scope, String name, Capability capabilities) {
Preconditions.checkArgument(
capabilities.specificationOnName(scope, name).supported(),
capabilities.specificationOnName(scope, name).unsupportedMessage()
+ " Illegal name: "
+ name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.datastrato.gravitino.SupportsCatalogs;
import com.datastrato.gravitino.connector.BaseCatalog;
import com.datastrato.gravitino.connector.HasPropertyMetadata;
import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.exceptions.CatalogAlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
Expand Down Expand Up @@ -135,6 +136,10 @@ public <R> R doWithPropertiesMeta(ThrowableFunction<HasPropertyMetadata, R> fn)
return classLoader.withClassLoader(cl -> fn.apply(catalog.ops()));
}

public Capability capabilities() throws Exception {
return classLoader.withClassLoader(cl -> catalog.capability());
}

public void close() {
try {
classLoader.withClassLoader(
Expand Down Expand Up @@ -566,6 +571,7 @@ private CatalogWrapper createCatalogWrapper(CatalogEntity entity) {
// so. For simply, We will preload the value of properties and thus AppClassLoader can get
// the value of properties.
wrapper.catalog.properties();
wrapper.catalog.capability();
return null;
},
IllegalArgumentException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog;

import static com.datastrato.gravitino.Entity.EntityType.TABLE;
import static com.datastrato.gravitino.catalog.CapabilityHelpers.applyCapabilities;
import static com.datastrato.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
import static com.datastrato.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;

Expand Down Expand Up @@ -158,7 +159,7 @@ public Table createTable(
t ->
t.createTable(
ident,
columns,
applyCapabilities(columns, c.capabilities()),
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
Expand Down Expand Up @@ -222,7 +223,9 @@ public Table alterTable(NameIdentifier ident, TableChange... changes)
Table tempAlteredTable =
doWithCatalog(
catalogIdent,
c -> c.doWithTableOps(t -> t.alterTable(ident, changes)),
c ->
c.doWithTableOps(
t -> t.alterTable(ident, applyCapabilities(c.capabilities(), changes))),
NoSuchTableException.class,
IllegalArgumentException.class);

Expand Down
Loading
Loading