Skip to content

Commit

Permalink
Do not create temporary staging path for CREATE TABLE statement
Browse files Browse the repository at this point in the history
  • Loading branch information
weijiii committed Nov 14, 2022
1 parent d4707c4 commit 792310e
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,10 @@ public void testInsertPartitionedBucketedTransactionalTableLayout()
{
// Alluxio metastore does not support insert/update/delete operations
}

@Override
public void testCreateEmptyTableShouldNotCreateStagingDirectory()
{
// Alluxio metastore does not support create operations
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,20 @@ public HiveLocationService(HdfsEnvironment hdfsEnvironment)
}

@Override
public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional<Path> externalLocation)
public Path forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName)
{
HdfsContext context = new HdfsContext(session);
Path targetPath = getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName);

// verify the target directory for table
if (pathExists(context, hdfsEnvironment, targetPath)) {
throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath));
}
return targetPath;
}

@Override
public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional<Path> externalLocation)
{
HdfsContext context = new HdfsContext(session);
Path targetPath = externalLocation.orElseGet(() -> getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
targetPath = Optional.empty();
}
else {
LocationHandle locationHandle = locationService.forNewTable(metastore, session, schemaName, tableName, Optional.empty());
targetPath = Optional.of(locationService.getQueryWriteInfo(locationHandle).getTargetPath());
targetPath = Optional.of(locationService.forNewTable(metastore, session, schemaName, tableName));
}
}

Expand Down Expand Up @@ -1536,7 +1535,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
.collect(toImmutableList());
checkPartitionTypesSupported(partitionColumns);

LocationHandle locationHandle = locationService.forNewTable(metastore, session, schemaName, tableName, externalLocation);
LocationHandle locationHandle = locationService.forNewTableAsSelect(metastore, session, schemaName, tableName, externalLocation);

AcidTransaction transaction = isTransactional ? forCreateTable() : NO_ACID_TRANSACTION;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

public interface LocationService
{
LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional<Path> externalLocation);
Path forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName);

LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional<Path> externalLocation);

LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2624,8 +2624,7 @@ public void testTableCreationIgnoreExisting()
try {
try (Transaction transaction = newTransaction()) {
LocationService locationService = getLocationService();
LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, Optional.empty());
targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath();
targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName);
Table table = createSimpleTable(schemaTableName, columns, session, targetPath, "q1");
transaction.getMetastore()
.createTable(session, table, privileges, Optional.empty(), Optional.empty(), false, EMPTY_TABLE_STATISTICS, false);
Expand Down Expand Up @@ -2968,6 +2967,57 @@ public void testEmptyTableCreation()
}
}

@Test
public void testCreateEmptyTableShouldNotCreateStagingDirectory()
throws IOException
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryCreateEmptyTable = temporaryTable("create_empty");
try {
List<Column> columns = ImmutableList.of(new Column("test", HIVE_STRING, Optional.empty()));
try (Transaction transaction = newTransaction()) {
final String temporaryStagingPrefix = "hive-temporary-staging-prefix-" + UUID.randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
ConnectorSession session = newSession(ImmutableMap.of("hive.temporary_staging_directory_path", temporaryStagingPrefix));
String tableOwner = session.getUser();
String schemaName = temporaryCreateEmptyTable.getSchemaName();
String tableName = temporaryCreateEmptyTable.getTableName();
LocationService locationService = getLocationService();
Path targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName);
Table.Builder tableBuilder = Table.builder()
.setDatabaseName(schemaName)
.setTableName(tableName)
.setOwner(Optional.of(tableOwner))
.setTableType(MANAGED_TABLE.name())
.setParameters(ImmutableMap.of(
PRESTO_VERSION_NAME, TEST_SERVER_VERSION,
PRESTO_QUERY_ID_NAME, session.getQueryId()))
.setDataColumns(columns);
tableBuilder.getStorageBuilder()
.setLocation(targetPath.toString())
.setStorageFormat(StorageFormat.create(storageFormat.getSerde(), storageFormat.getInputFormat(), storageFormat.getOutputFormat()));
transaction.getMetastore().createTable(
session,
tableBuilder.build(),
testingPrincipalPrivilege(tableOwner, session.getUser()),
Optional.empty(),
Optional.empty(),
true,
EMPTY_TABLE_STATISTICS,
false);
transaction.commit();

HdfsContext context = new HdfsContext(session);
Path temporaryRoot = new Path(targetPath, temporaryStagingPrefix);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, temporaryRoot);
assertFalse(fileSystem.exists(temporaryRoot), format("Temporary staging directory %s is created.", temporaryRoot));
}
}
finally {
dropTable(temporaryCreateEmptyTable);
}
}
}

@Test
public void testViewCreation()
{
Expand Down Expand Up @@ -3319,8 +3369,7 @@ public void testIllegalStorageFormatDuringTableScan()
String tableOwner = session.getUser();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();
LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, Optional.empty());
Path targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath();
Path targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName);
//create table whose storage format is null
Table.Builder tableBuilder = Table.builder()
.setDatabaseName(schemaName)
Expand Down Expand Up @@ -5411,8 +5460,7 @@ protected void createEmptyTable(
String tableName = schemaTableName.getTableName();

LocationService locationService = getLocationService();
LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, Optional.empty());
targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath();
targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName);

ImmutableMap.Builder<String, String> tableParamBuilder = ImmutableMap.<String, String>builder()
.put(PRESTO_VERSION_NAME, TEST_SERVER_VERSION)
Expand Down

0 comments on commit 792310e

Please sign in to comment.