Skip to content

Commit

Permalink
Remove system.iceberg_tables table function
Browse files Browse the repository at this point in the history
We decided to add a system table instead.

This reverts commit 5ce80be.
  • Loading branch information
ebyhr authored Jan 16, 2025
1 parent 41ba97d commit ce79254
Show file tree
Hide file tree
Showing 10 changed files with 3 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider;
import io.trino.plugin.iceberg.functions.tables.IcebergTablesFunctionProvider;
import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure;
import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure;
Expand Down Expand Up @@ -138,9 +137,7 @@ public void configure(Binder binder)
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON);

Multibinder<ConnectorTableFunction> tableFunctions = newSetBinder(binder, ConnectorTableFunction.class);
tableFunctions.addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
tableFunctions.addBinding().toProvider(IcebergTablesFunctionProvider.class).in(Scopes.SINGLETON);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(TableChangesFunctionProcessorProviderFactory.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesSplitSource;
import io.trino.plugin.iceberg.functions.tables.IcebergTablesFunction.IcebergTables;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
Expand Down Expand Up @@ -158,9 +157,6 @@ public ConnectorSplitSource getSplits(
.toSnapshot(functionHandle.endSnapshotId()));
return new ClassLoaderSafeConnectorSplitSource(tableChangesSplitSource, IcebergSplitManager.class.getClassLoader());
}
if (function instanceof IcebergTables icebergTables) {
return new ClassLoaderSafeConnectorSplitSource(new FixedSplitSource(icebergTables), IcebergSplitManager.class.getClassLoader());
}

throw new IllegalStateException("Unknown table function: " + function);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,12 @@
package io.trino.plugin.iceberg.functions;

import com.google.inject.Inject;
import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProviderFactory;
import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionSplitProcessor;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory;
import io.trino.plugin.iceberg.functions.tables.IcebergTablesFunction;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.function.FunctionProvider;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.function.table.TableFunctionProcessorProvider;
import io.trino.spi.function.table.TableFunctionProcessorProviderFactory;
import io.trino.spi.function.table.TableFunctionSplitProcessor;

import static java.util.Objects.requireNonNull;

Expand All @@ -48,28 +40,6 @@ public TableFunctionProcessorProviderFactory getTableFunctionProcessorProviderFa
if (functionHandle instanceof TableChangesFunctionHandle) {
return new ClassLoaderSafeTableFunctionProcessorProviderFactory(tableChangesFunctionProcessorProviderFactory, getClass().getClassLoader());
}
if (functionHandle instanceof IcebergTablesFunction.IcebergTables) {
ClassLoader classLoader = getClass().getClassLoader();
return new TableFunctionProcessorProviderFactory()
{
@Override
public TableFunctionProcessorProvider createTableFunctionProcessorProvider()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return new ClassLoaderSafeTableFunctionProcessorProvider(new TableFunctionProcessorProvider()
{
@Override
public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle, ConnectorSplit split)
{
return new ClassLoaderSafeTableFunctionSplitProcessor(
new IcebergTablesFunction.IcebergTablesProcessor(((IcebergTablesFunction.IcebergTables) split).tables()),
getClass().getClassLoader());
}
}, classLoader);
}
}
};
}

throw new UnsupportedOperationException("Unsupported function: " + functionHandle);
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.testing.BaseConnectorSmokeTest;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.sql.TestTable;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -71,7 +70,6 @@
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

Expand Down Expand Up @@ -838,56 +836,6 @@ public void testCreateOrReplaceWithTableChangesFunction()
}
}

@Test
public void testIcebergTablesFunction()
throws Exception
{
String schemaName = getSession().getSchema().orElseThrow();
String firstSchema = "first_schema_" + randomNameSuffix();
String secondSchema = "second_schema_" + randomNameSuffix();
String firstSchemaLocation = schemaPath().replaceAll(schemaName, firstSchema);
String secondSchemaLocation = schemaPath().replaceAll(schemaName, secondSchema);
assertQuerySucceeds("CREATE SCHEMA " + firstSchema + " WITH (location = '%s')".formatted(firstSchemaLocation));
assertQuerySucceeds("CREATE SCHEMA " + secondSchema + " WITH (location = '%s')".formatted(secondSchemaLocation));
QueryRunner queryRunner = getQueryRunner();
Session firstSchemaSession = Session.builder(queryRunner.getDefaultSession()).setSchema(firstSchema).build();
Session secondSchemaSession = Session.builder(queryRunner.getDefaultSession()).setSchema(secondSchema).build();

try (TestTable _ = new TestTable(
sql -> getQueryRunner().execute(firstSchemaSession, sql),
"first_schema_table1_",
"(id int)");
TestTable _ = new TestTable(
sql -> getQueryRunner().execute(firstSchemaSession, sql),
"first_schema_table2_",
"(id int)");
TestTable secondSchemaTable = new TestTable(
sql -> queryRunner.execute(secondSchemaSession, sql),
"second_schema_table_",
"(id int)");
AutoCloseable _ = createAdditionalTables(firstSchema)) {
String firstSchemaTablesValues = "VALUES " + getQueryRunner()
.execute("SELECT table_schema, table_name FROM iceberg.information_schema.tables WHERE table_schema='%s'".formatted(firstSchema))
.getMaterializedRows().stream()
.map(row -> "('%s', '%s')".formatted(row.getField(0), row.getField(1)))
.collect(joining(", "));
String bothSchemasTablesValues = firstSchemaTablesValues + ", ('%s', '%s')".formatted(secondSchema, secondSchemaTable.getName());
assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(SCHEMA_NAME => '%s'))".formatted(firstSchema), firstSchemaTablesValues);
assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(null)) WHERE table_schema = '%s'".formatted(firstSchema), firstSchemaTablesValues);
assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables()) WHERE table_schema in ('%s', '%s')".formatted(firstSchema, secondSchema), bothSchemasTablesValues);
assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(null)) WHERE table_schema in ('%s', '%s')".formatted(firstSchema, secondSchema), bothSchemasTablesValues);
}
finally {
assertQuerySucceeds("DROP SCHEMA " + firstSchema);
assertQuerySucceeds("DROP SCHEMA " + secondSchema);
}
}

protected AutoCloseable createAdditionalTables(String schema)
{
return () -> {};
}

@Test
public void testMetadataDeleteAfterCommitEnabled()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.minio.messages.Event;
import io.trino.Session;
import io.trino.metastore.Column;
import io.trino.metastore.HiveMetastore;
import io.trino.metastore.HiveType;
import io.trino.metastore.Table;
import io.trino.plugin.hive.containers.Hive3MinioDataLake;
import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.trino.testing.QueryRunner;
Expand All @@ -32,25 +28,18 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder;
import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore;
import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
import static io.trino.testing.containers.Minio.MINIO_REGION;
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;

Expand Down Expand Up @@ -245,25 +234,6 @@ public void testPathContainsSpecialCharacter()
assertUpdate("DROP TABLE " + tableName);
}

@Override
protected AutoCloseable createAdditionalTables(String schema)
{
HiveMetastore metastore = getHiveMetastore(getQueryRunner());
// simulate iceberg table created by spark with lowercase table type
Table lowerCaseTableType = io.trino.metastore.Table.builder()
.setDatabaseName(schema)
.setTableName("lowercase_type_" + randomNameSuffix())
.setOwner(Optional.empty())
.setDataColumns(ImmutableList.of(new Column("id", HiveType.HIVE_STRING, Optional.empty(), ImmutableMap.of())))
.setTableType(EXTERNAL_TABLE.name())
.withStorage(storage -> storage.setStorageFormat(ICEBERG_METASTORE_STORAGE_FORMAT))
.setParameter("EXTERNAL", "TRUE")
.setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(ENGLISH))
.build();
metastore.createTable(lowerCaseTableType, NO_PRIVILEGES);
return () -> metastore.dropTable(lowerCaseTableType.getDatabaseName(), lowerCaseTableType.getTableName(), true);
}

private String onMetastore(@Language("SQL") String sql)
{
return hiveMinioDataLake.getHiveHadoop().runOnMetastore(sql);
Expand Down
Loading

0 comments on commit ce79254

Please sign in to comment.