Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove session property overrides for Hive staging location #17390

Merged
merged 1 commit into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,16 @@ public void testSession()
}

for (String part : ImmutableList.of(",", "=", ":", "|", "/", "\\", "'", "\\'", "''", "\"", "\\\"", "[", "]")) {
String value = format("/tmp/presto-%s-${USER}", part);
String value = format("my-table-%s-name", part);
try {
try (Statement statement = connection.createStatement()) {
statement.execute(format("SET SESSION hive.temporary_staging_directory_path = '%s'", value.replace("'", "''")));
statement.execute(format("SET SESSION spatial_partitioning_table_name = '%s'", value.replace("'", "''")));
}

assertThat(listSession(connection))
.contains("join_distribution_type|BROADCAST|AUTOMATIC")
.contains("exchange_compression|true|false")
.contains(format("hive.temporary_staging_directory_path|%s|/tmp/presto-${USER}", value));
.contains(format("spatial_partitioning_table_name|%s|", value));
}
catch (Exception e) {
fail(format("Failed to set session property value to [%s]", value), e);
Expand Down Expand Up @@ -420,14 +420,14 @@ private void testRole(String roleParameterValue, ClientSelectedRole clientSelect
public void testSessionProperties()
throws SQLException
{
try (Connection connection = createConnection("roles=hive:admin&sessionProperties=hive.temporary_staging_directory_path:/tmp;execution_policy:all-at-once")) {
try (Connection connection = createConnection("roles=hive:admin&sessionProperties=hive.hive_views_legacy_translation:true;execution_policy:all-at-once")) {
TrinoConnection trinoConnection = connection.unwrap(TrinoConnection.class);
assertThat(trinoConnection.getSessionProperties())
.extractingByKeys("hive.temporary_staging_directory_path", "execution_policy")
.containsExactly("/tmp", "all-at-once");
.extractingByKeys("hive.hive_views_legacy_translation", "execution_policy")
.containsExactly("true", "all-at-once");
assertThat(listSession(connection)).containsAll(ImmutableSet.of(
"execution_policy|all-at-once|phased",
"hive.temporary_staging_directory_path|/tmp|/tmp/presto-${USER}"));
"hive.hive_views_legacy_translation|true|false"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public S3SelectTestHelper(String host,
HivePartitionManager hivePartitionManager = new HivePartitionManager(this.hiveConfig);

hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, new HdfsConfig(), new NoHdfsAuthentication());
locationService = new HiveLocationService(hdfsEnvironment);
locationService = new HiveLocationService(hdfsEnvironment, hiveConfig);
JsonCodec<PartitionUpdate> partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class);

metastoreClient = new TestingHiveMetastore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Optional;

import static io.trino.plugin.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
import static io.trino.plugin.hive.HiveSessionProperties.isTemporaryStagingDirectoryEnabled;
import static io.trino.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY;
import static io.trino.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY;
import static io.trino.plugin.hive.LocationHandle.WriteMode.STAGE_AND_MOVE_TO_TARGET_DIRECTORY;
Expand All @@ -47,11 +46,15 @@ public class HiveLocationService
implements LocationService
{
private final HdfsEnvironment hdfsEnvironment;
private final boolean temporaryStagingDirectoryEnabled;
private final String temporaryStagingDirectoryPath;

@Inject
public HiveLocationService(HdfsEnvironment hdfsEnvironment)
public HiveLocationService(HdfsEnvironment hdfsEnvironment, HiveConfig hiveConfig)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.temporaryStagingDirectoryEnabled = hiveConfig.isTemporaryStagingDirectoryEnabled();
this.temporaryStagingDirectoryPath = hiveConfig.getTemporaryStagingDirectoryPath();
}

@Override
Expand Down Expand Up @@ -79,8 +82,8 @@ public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metasto
}

// TODO detect when existing table's location is a on a different file system than the temporary directory
if (shouldUseTemporaryDirectory(session, context, new Path(targetPath.toString()), externalLocation.isPresent())) {
Location writePath = createTemporaryPath(session, context, hdfsEnvironment, new Path(targetPath.toString()));
if (shouldUseTemporaryDirectory(context, new Path(targetPath.toString()), externalLocation.isPresent())) {
Location writePath = createTemporaryPath(context, hdfsEnvironment, new Path(targetPath.toString()), temporaryStagingDirectoryPath);
return new LocationHandle(targetPath, writePath, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_NEW_DIRECTORY);
Expand All @@ -92,8 +95,8 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore,
HdfsContext context = new HdfsContext(session);
Location targetPath = Location.of(table.getStorage().getLocation());

if (shouldUseTemporaryDirectory(session, context, new Path(targetPath.toString()), false) && !isTransactionalTable(table.getParameters())) {
Location writePath = createTemporaryPath(session, context, hdfsEnvironment, new Path(targetPath.toString()));
if (shouldUseTemporaryDirectory(context, new Path(targetPath.toString()), false) && !isTransactionalTable(table.getParameters())) {
Location writePath = createTemporaryPath(context, hdfsEnvironment, new Path(targetPath.toString()), temporaryStagingDirectoryPath);
return new LocationHandle(targetPath, writePath, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_EXISTING_DIRECTORY);
Expand All @@ -107,9 +110,9 @@ public LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, Conn
return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_EXISTING_DIRECTORY);
}

private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path, boolean hasExternalLocation)
private boolean shouldUseTemporaryDirectory(HdfsContext context, Path path, boolean hasExternalLocation)
{
return isTemporaryStagingDirectoryEnabled(session)
return temporaryStagingDirectoryEnabled
// skip using temporary directory for S3
&& !isS3FileSystem(context, hdfsEnvironment, path)
// skip using temporary directory if destination is encrypted; it's not possible to move a file between encryption zones
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class HivePageSinkProvider
private final HiveWriterStats hiveWriterStats;
private final long perTransactionMetastoreCacheMaximumSize;
private final DateTimeZone parquetTimeZone;
private final boolean temporaryStagingDirectoryDirectoryEnabled;
private final String temporaryStagingDirectoryPath;

@Inject
public HivePageSinkProvider(
Expand Down Expand Up @@ -116,6 +118,8 @@ public HivePageSinkProvider(
this.hiveWriterStats = requireNonNull(hiveWriterStats, "hiveWriterStats is null");
this.perTransactionMetastoreCacheMaximumSize = config.getPerTransactionMetastoreCacheMaximumSize();
this.parquetTimeZone = config.getParquetDateTimeZone();
this.temporaryStagingDirectoryDirectoryEnabled = config.isTemporaryStagingDirectoryEnabled();
this.temporaryStagingDirectoryPath = config.getTemporaryStagingDirectoryPath();
}

@Override
Expand Down Expand Up @@ -187,7 +191,9 @@ private HivePageSink createPageSink(HiveWritableTableHandle handle, boolean isCr
nodeManager,
eventClient,
hiveSessionProperties,
hiveWriterStats);
hiveWriterStats,
temporaryStagingDirectoryDirectoryEnabled,
temporaryStagingDirectoryPath);

return new HivePageSink(
handle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ public final class HiveSessionProperties
private static final String COLLECT_COLUMN_STATISTICS_ON_WRITE = "collect_column_statistics_on_write";
private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count";
private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path";
private static final String DELEGATE_TRANSACTIONAL_MANAGED_TABLE_LOCATION_TO_METASTORE = "delegate_transactional_managed_table_location_to_metastore";
private static final String IGNORE_ABSENT_PARTITIONS = "ignore_absent_partitions";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
Expand Down Expand Up @@ -510,16 +508,6 @@ public HiveSessionProperties(
"S3 Select pushdown enabled",
hiveConfig.isS3SelectPushdownEnabled(),
false),
booleanProperty(
TEMPORARY_STAGING_DIRECTORY_ENABLED,
"Should use temporary staging directory for write operations",
hiveConfig.isTemporaryStagingDirectoryEnabled(),
false),
stringProperty(
TEMPORARY_STAGING_DIRECTORY_PATH,
"Temporary staging directory location",
hiveConfig.getTemporaryStagingDirectoryPath(),
false),
booleanProperty(
DELEGATE_TRANSACTIONAL_MANAGED_TABLE_LOCATION_TO_METASTORE,
"When transactional managed table is created via Trino the location will not be set in request sent to HMS and location will be determined by metastore; if this property is set to true CREATE TABLE AS queries are not supported.",
Expand Down Expand Up @@ -950,16 +938,6 @@ public static boolean isOptimizedMismatchedBucketCount(ConnectorSession session)
return session.getProperty(OPTIMIZE_MISMATCHED_BUCKET_COUNT, Boolean.class);
}

public static boolean isTemporaryStagingDirectoryEnabled(ConnectorSession session)
{
return session.getProperty(TEMPORARY_STAGING_DIRECTORY_ENABLED, Boolean.class);
}

public static String getTemporaryStagingDirectoryPath(ConnectorSession session)
{
return session.getProperty(TEMPORARY_STAGING_DIRECTORY_PATH, String.class);
}

public static boolean isDelegateTransactionalManagedTableLocationToMetastore(ConnectorSession session)
{
return session.getProperty(DELEGATE_TRANSACTIONAL_MANAGED_TABLE_LOCATION_TO_METASTORE, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_TABLE_READ_ONLY;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior;
import static io.trino.plugin.hive.HiveSessionProperties.getTemporaryStagingDirectoryPath;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.trino.plugin.hive.HiveSessionProperties.isTemporaryStagingDirectoryEnabled;
import static io.trino.plugin.hive.HiveType.toHiveType;
import static io.trino.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY;
import static io.trino.plugin.hive.acid.AcidOperation.CREATE_TABLE;
Expand Down Expand Up @@ -199,7 +197,9 @@ public HiveWriterFactory(
NodeManager nodeManager,
EventClient eventClient,
HiveSessionProperties hiveSessionProperties,
HiveWriterStats hiveWriterStats)
HiveWriterStats hiveWriterStats,
boolean sortedWritingTempStagingPathEnabled,
String sortedWritingTempStagingPath)
{
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
this.fileSystem = fileSystemFactory.create(session);
Expand All @@ -221,8 +221,8 @@ public HiveWriterFactory(
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.sortBufferSize = requireNonNull(sortBufferSize, "sortBufferSize is null");
this.maxOpenSortFiles = maxOpenSortFiles;
this.sortedWritingTempStagingPathEnabled = isTemporaryStagingDirectoryEnabled(session);
this.sortedWritingTempStagingPath = getTemporaryStagingDirectoryPath(session);
this.sortedWritingTempStagingPathEnabled = sortedWritingTempStagingPathEnabled;
this.sortedWritingTempStagingPath = requireNonNull(sortedWritingTempStagingPath, "sortedWritingTempStagingPath is null");
this.insertExistingPartitionsBehavior = getInsertExistingPartitionsBehavior(session);
this.parquetTimeZone = requireNonNull(parquetTimeZone, "parquetTimeZone is null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_SERDE_NOT_FOUND;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
import static io.trino.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION;
import static io.trino.plugin.hive.HiveSessionProperties.getTemporaryStagingDirectoryPath;
import static io.trino.plugin.hive.TableType.MANAGED_TABLE;
import static io.trino.plugin.hive.TableType.MATERIALIZED_VIEW;
import static io.trino.plugin.hive.metastore.MetastoreUtil.getProtectMode;
Expand Down Expand Up @@ -516,11 +515,10 @@ public static boolean isFileCreatedByQuery(String fileName, String queryId)
return fileName.startsWith(queryId) || fileName.endsWith(queryId);
}

public static Location createTemporaryPath(ConnectorSession session, HdfsContext context, HdfsEnvironment hdfsEnvironment, Path targetPath)
public static Location createTemporaryPath(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path targetPath, String temporaryStagingDirectoryPath)
{
// use a per-user temporary directory to avoid permission problems
String temporaryPrefix = getTemporaryStagingDirectoryPath(session)
.replace("${USER}", context.getIdentity().getUser());
String temporaryPrefix = temporaryStagingDirectoryPath.replace("${USER}", context.getIdentity().getUser());

// use relative temporary directory on ViewFS
if (isViewFileSystem(context, hdfsEnvironment, targetPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static io.trino.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
import static io.trino.plugin.hive.HiveMetadata.PRESTO_VERSION_NAME;
import static io.trino.plugin.hive.HiveSessionProperties.getTemporaryStagingDirectoryPath;
import static io.trino.plugin.hive.HiveSessionProperties.isTemporaryStagingDirectoryEnabled;
import static io.trino.plugin.hive.HiveStorageFormat.AVRO;
import static io.trino.plugin.hive.HiveStorageFormat.CSV;
import static io.trino.plugin.hive.HiveStorageFormat.JSON;
Expand Down Expand Up @@ -825,7 +823,7 @@ protected final void setup(String databaseName, HiveConfig hiveConfig, HiveMetas
metastoreClient = hiveMetastore;
hdfsEnvironment = hdfsConfiguration;
HivePartitionManager partitionManager = new HivePartitionManager(hiveConfig);
locationService = new HiveLocationService(hdfsEnvironment);
locationService = new HiveLocationService(hdfsEnvironment, hiveConfig);
JsonCodec<PartitionUpdate> partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class);
countingDirectoryLister = new CountingDirectoryLister();
metadataFactory = new HiveMetadataFactory(
Expand Down Expand Up @@ -945,7 +943,7 @@ public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Connect
protected HiveConfig getHiveConfig()
{
return new HiveConfig()
.setTemporaryStagingDirectoryPath(temporaryStagingDirectory.toAbsolutePath().toString());
.setTemporaryStagingDirectoryPath(temporaryStagingDirectory.resolve("temp_path_").toAbsolutePath().toString());
}

protected SortingFileWriterConfig getSortingFileWriterConfig()
Expand Down Expand Up @@ -2857,16 +2855,16 @@ private void doTestBucketSortedTables(SchemaTableName table)
}

HdfsContext context = new HdfsContext(session);
HiveConfig config = getHiveConfig();
// verify we have enough temporary files per bucket to require multiple passes
Location stagingPathRoot;
if (isTemporaryStagingDirectoryEnabled(session)) {
stagingPathRoot = Location.of(getTemporaryStagingDirectoryPath(session)
if (config.isTemporaryStagingDirectoryEnabled()) {
stagingPathRoot = Location.of(config.getTemporaryStagingDirectoryPath()
.replace("${USER}", context.getIdentity().getUser()));
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved
}
else {
stagingPathRoot = getStagingPathRoot(outputHandle);
}

assertThat(listAllDataFiles(context, stagingPathRoot))
.filteredOn(file -> file.contains(".tmp-sort."))
.size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2);
Expand Down Expand Up @@ -3075,12 +3073,15 @@ public void testCreateEmptyTableShouldNotCreateStagingDirectory()
try {
List<Column> columns = ImmutableList.of(new Column("test", HIVE_STRING, Optional.empty()));
try (Transaction transaction = newTransaction()) {
final String temporaryStagingPrefix = "hive-temporary-staging-prefix-" + UUID.randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
ConnectorSession session = newSession(ImmutableMap.of("hive.temporary_staging_directory_path", temporaryStagingPrefix));
String temporaryStagingPrefix = "hive-temporary-staging-prefix-" + UUID.randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
ConnectorSession session = newSession();
String tableOwner = session.getUser();
String schemaName = temporaryCreateEmptyTable.getSchemaName();
String tableName = temporaryCreateEmptyTable.getTableName();
LocationService locationService = getLocationService();
HiveConfig hiveConfig = getHiveConfig()
.setTemporaryStagingDirectoryPath(temporaryStagingPrefix)
.setTemporaryStagingDirectoryEnabled(true);
LocationService locationService = new HiveLocationService(hdfsEnvironment, hiveConfig);
Location targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName);
Table.Builder tableBuilder = Table.builder()
.setDatabaseName(schemaName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec
.build()),
getBasePath(),
hdfsEnvironment);
locationService = new HiveLocationService(hdfsEnvironment);
locationService = new HiveLocationService(hdfsEnvironment, config);
JsonCodec<PartitionUpdate> partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class);
metadataFactory = new HiveMetadataFactory(
new CatalogName("hive"),
Expand Down
Loading