Skip to content

Commit

Permalink
[#1661] feat(core): introduce catalog capability framework (#2819)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

1. Introduce catalog capability framework
2. Support `column not null` capability to show how the framework works

### Why are the changes needed?

Improving code quality

Fix: #1662 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests
  • Loading branch information
mchades authored Apr 15, 2024
1 parent c19f9cf commit 8fd412c
Show file tree
Hide file tree
Showing 12 changed files with 371 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.datastrato.gravitino.connector.BaseCatalog;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.TableCatalog;
import java.util.Map;
Expand Down Expand Up @@ -37,6 +38,11 @@ protected CatalogOperations newOps(Map<String, String> config) {
return ops;
}

@Override
public Capability newCapability() {
return new HiveCatalogCapability();
}

/**
* Returns the Hive catalog operations as a {@link SupportsSchemas}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.hive;

import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.connector.capability.CapabilityResult;

public class HiveCatalogCapability implements Capability {
@Override
public CapabilityResult columnNotNull() {
// The NOT NULL constraint for column is supported since Hive3.0, see
// https://issues.apache.org/jira/browse/HIVE-16575
return CapabilityResult.unsupported(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,6 @@ private void validateColumnChangeForAlter(
|| !partitionFields.contains(fieldToAdd),
"Cannot alter partition column: " + fieldToAdd);

if (c instanceof TableChange.UpdateColumnNullability) {
throw new IllegalArgumentException(
"Hive does not support altering column nullability");
}

if (c instanceof TableChange.UpdateColumnDefaultValue) {
throw new IllegalArgumentException(
"Hive does not support altering column default value");
Expand Down Expand Up @@ -690,7 +685,6 @@ public Table createTable(
Arrays.stream(columns)
.forEach(
c -> {
validateNullable(c.name(), c.nullable());
validateColumnDefaultValue(c.name(), c.defaultValue());
});

Expand Down Expand Up @@ -791,7 +785,6 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
String fieldName = String.join(".", addColumn.fieldName());
validateNullable(fieldName, addColumn.isNullable());
validateColumnDefaultValue(fieldName, addColumn.getDefaultValue());
doAddColumn(cols, addColumn);

Expand Down Expand Up @@ -872,17 +865,6 @@ private void validateColumnDefaultValue(String fieldName, Expression defaultValu
}
}

private void validateNullable(String fieldName, boolean nullable) {
// The NOT NULL constraint for column is supported since Hive3.0, see
// https://issues.apache.org/jira/browse/HIVE-16575
if (!nullable) {
throw new IllegalArgumentException(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x. Illegal column: "
+ fieldName);
}
}

private int columnPosition(List<FieldSchema> columns, TableChange.ColumnPosition position) {
Preconditions.checkArgument(position != null, "Column position cannot be null");
if (position instanceof TableChange.After) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,33 +202,6 @@ public void testCreateHiveTable() {
sortOrders));
Assertions.assertTrue(exception.getMessage().contains("Table already exists"));

HiveColumn illegalColumn =
HiveColumn.builder()
.withName("col_3")
.withType(Types.ByteType.get())
.withComment(HIVE_COMMENT)
.withNullable(false)
.build();

exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() ->
tableCatalog.createTable(
tableIdentifier,
new Column[] {illegalColumn},
HIVE_COMMENT,
properties,
new Transform[0],
distribution,
sortOrders));
Assertions.assertTrue(
exception
.getMessage()
.contains(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x"));

HiveColumn withDefault =
HiveColumn.builder()
.withName("col_3")
Expand Down Expand Up @@ -455,21 +428,6 @@ public void testAlterHiveTable() {
() -> tableCatalog.alterTable(tableIdentifier, tableChange3));
Assertions.assertTrue(exception.getMessage().contains("Column position cannot be null"));

TableChange.ColumnPosition first = TableChange.ColumnPosition.first();
TableChange tableChange4 =
TableChange.addColumn(new String[] {"col_3"}, Types.ByteType.get(), null, first, false);

exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> tableCatalog.alterTable(tableIdentifier, tableChange4));
Assertions.assertTrue(
exception
.getMessage()
.contains(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x"));

TableChange.ColumnPosition pos = TableChange.ColumnPosition.after(col2.name());
TableChange tableChange5 =
TableChange.addColumn(new String[] {"col_3"}, Types.ByteType.get(), pos);
Expand All @@ -489,14 +447,6 @@ public void testAlterHiveTable() {
() -> tableCatalog.alterTable(tableIdentifier, tableChange6));
Assertions.assertTrue(exception.getMessage().contains("Cannot add column with duplicate name"));

TableChange tableChange7 = TableChange.updateColumnNullability(new String[] {"col_1"}, false);
exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> tableCatalog.alterTable(tableIdentifier, tableChange7));
Assertions.assertEquals(
"Hive does not support altering column nullability", exception.getMessage());

TableChange tableChange8 =
TableChange.addColumn(
new String[] {"col_3"}, Types.ByteType.get(), "comment", Literals.NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,28 @@ public void testCreateHiveTable() throws TException, InterruptedException {
Assertions.assertEquals(properties.get(key), hiveTable1.getParameters().get(key)));
assertTableEquals(createdTable1, hiveTable1);
checkTableReadWrite(hiveTable1);

// test column not null
Column illegalColumn =
Column.of("not_null_column", Types.StringType.get(), "not null column", false, false, null);
IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() ->
catalog
.asTableCatalog()
.createTable(
nameIdentifier,
new Column[] {illegalColumn},
TABLE_COMMENT,
properties,
Transforms.EMPTY_TRANSFORM));
Assertions.assertTrue(
exception
.getMessage()
.contains(
"The NOT NULL constraint for column is only supported since Hive 3.0, "
+ "but the current Gravitino Hive catalog only supports Hive 2.x"));
}

@Test
Expand Down Expand Up @@ -1098,6 +1120,20 @@ public void testAlterHiveTable() throws TException, InterruptedException {
+ "but the current Gravitino Hive catalog only supports Hive 2.x"),
"The exception message is: " + exception.getMessage());

// test alter column nullability exception
TableChange alterColumnNullability =
TableChange.updateColumnNullability(new String[] {HIVE_COL_NAME1}, false);
exception =
assertThrows(
IllegalArgumentException.class,
() -> tableCatalog.alterTable(id, alterColumnNullability));
Assertions.assertTrue(
exception
.getMessage()
.contains(
"The NOT NULL constraint for column is only supported since Hive 3.0,"
+ " but the current Gravitino Hive catalog only supports Hive 2.x. Illegal column: hive_col_name1"));

// test updateColumnPosition exception
Column col1 = Column.of("name", Types.StringType.get(), "comment");
Column col2 = Column.of("address", Types.StringType.get(), "comment");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog;

import static com.datastrato.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;

import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableChange;
import com.google.common.base.Preconditions;
import java.util.Arrays;

public class CapabilityHelpers {

public static Column[] applyCapabilities(Column[] columns, Capability capabilities) {
return Arrays.stream(columns)
.map(c -> applyCapabilities(c, capabilities))
.toArray(Column[]::new);
}

public static TableChange[] applyCapabilities(Capability capabilities, TableChange... changes) {
return Arrays.stream(changes)
.map(
change -> {
if (change instanceof TableChange.AddColumn) {
return applyCapabilities((TableChange.AddColumn) change, capabilities);

} else if (change instanceof TableChange.UpdateColumnNullability) {
return applyCapabilities(
(TableChange.UpdateColumnNullability) change, capabilities);
}
return change;
})
.toArray(TableChange[]::new);
}

private static TableChange applyCapabilities(
TableChange.AddColumn addColumn, Capability capabilities) {
Column appliedColumn =
applyCapabilities(
Column.of(
addColumn.fieldName()[0],
addColumn.getDataType(),
addColumn.getComment(),
addColumn.isNullable(),
addColumn.isAutoIncrement(),
addColumn.getDefaultValue()),
capabilities);

return TableChange.addColumn(
applyCaseSensitiveOnColumnName(addColumn.fieldName(), capabilities),
appliedColumn.dataType(),
appliedColumn.comment(),
addColumn.getPosition(),
appliedColumn.nullable(),
appliedColumn.autoIncrement(),
appliedColumn.defaultValue());
}

private static TableChange applyCapabilities(
TableChange.UpdateColumnNullability updateColumnNullability, Capability capabilities) {

applyColumnNotNull(
String.join(".", updateColumnNullability.fieldName()),
updateColumnNullability.nullable(),
capabilities);

return TableChange.updateColumnNullability(
applyCaseSensitiveOnColumnName(updateColumnNullability.fieldName(), capabilities),
updateColumnNullability.nullable());
}

private static Column applyCapabilities(Column column, Capability capabilities) {
applyColumnNotNull(column, capabilities);
applyColumnDefaultValue(column, capabilities);
applyNameSpecification(Capability.Scope.COLUMN, column.name(), capabilities);

return Column.of(
applyCaseSensitiveOnName(Capability.Scope.COLUMN, column.name(), capabilities),
column.dataType(),
column.comment(),
column.nullable(),
column.autoIncrement(),
column.defaultValue());
}

private static String applyCaseSensitiveOnName(
Capability.Scope scope, String name, Capability capabilities) {
return capabilities.caseSensitiveOnName(scope).supported() ? name : name.toLowerCase();
}

private static String[] applyCaseSensitiveOnColumnName(String[] name, Capability capabilities) {
if (!capabilities.caseSensitiveOnName(Capability.Scope.COLUMN).supported()) {
String[] standardizeColumnName = Arrays.copyOf(name, name.length);
standardizeColumnName[0] = name[0].toLowerCase();
return standardizeColumnName;
}
return name;
}

private static void applyColumnNotNull(Column column, Capability capabilities) {
applyColumnNotNull(column.name(), column.nullable(), capabilities);
}

private static void applyColumnNotNull(
String columnName, boolean nullable, Capability capabilities) {
Preconditions.checkArgument(
capabilities.columnNotNull().supported() || nullable,
capabilities.columnNotNull().unsupportedMessage() + " Illegal column: " + columnName);
}

private static void applyColumnDefaultValue(Column column, Capability capabilities) {
Preconditions.checkArgument(
capabilities.columnDefaultValue().supported()
|| DEFAULT_VALUE_NOT_SET.equals(column.defaultValue()),
capabilities.columnDefaultValue().unsupportedMessage()
+ " Illegal column: "
+ column.name());
}

private static void applyNameSpecification(
Capability.Scope scope, String name, Capability capabilities) {
Preconditions.checkArgument(
capabilities.specificationOnName(scope, name).supported(),
capabilities.specificationOnName(scope, name).unsupportedMessage()
+ " Illegal name: "
+ name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.datastrato.gravitino.SupportsCatalogs;
import com.datastrato.gravitino.connector.BaseCatalog;
import com.datastrato.gravitino.connector.HasPropertyMetadata;
import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.exceptions.CatalogAlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
Expand Down Expand Up @@ -135,6 +136,10 @@ public <R> R doWithPropertiesMeta(ThrowableFunction<HasPropertyMetadata, R> fn)
return classLoader.withClassLoader(cl -> fn.apply(catalog.ops()));
}

public Capability capabilities() throws Exception {
return classLoader.withClassLoader(cl -> catalog.capability());
}

public void close() {
try {
classLoader.withClassLoader(
Expand Down Expand Up @@ -570,6 +575,7 @@ private CatalogWrapper createCatalogWrapper(CatalogEntity entity) {
// so. For simply, We will preload the value of properties and thus AppClassLoader can get
// the value of properties.
wrapper.catalog.properties();
wrapper.catalog.capability();
return null;
},
IllegalArgumentException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog;

import static com.datastrato.gravitino.Entity.EntityType.TABLE;
import static com.datastrato.gravitino.catalog.CapabilityHelpers.applyCapabilities;
import static com.datastrato.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
import static com.datastrato.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;

Expand Down Expand Up @@ -163,7 +164,7 @@ public Table createTable(
t ->
t.createTable(
ident,
columns,
applyCapabilities(columns, c.capabilities()),
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
Expand Down Expand Up @@ -227,7 +228,9 @@ public Table alterTable(NameIdentifier ident, TableChange... changes)
Table tempAlteredTable =
doWithCatalog(
catalogIdent,
c -> c.doWithTableOps(t -> t.alterTable(ident, changes)),
c ->
c.doWithTableOps(
t -> t.alterTable(ident, applyCapabilities(c.capabilities(), changes))),
NoSuchTableException.class,
IllegalArgumentException.class);

Expand Down
Loading

0 comments on commit 8fd412c

Please sign in to comment.