Skip to content

Commit

Permalink
[native] Set sane defaults for PRESTO_SERVER and DATA_DIR for NativeQ…
Browse files Browse the repository at this point in the history
…ueryRunner
  • Loading branch information
ZacBlanco authored and aditi-pandit committed Oct 14, 2023
1 parent 3a738e9 commit c6292d0
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@

import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerHiveProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerSystemProperties;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static org.testng.Assert.assertNotNull;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertTrue;

public class PrestoNativeQueryRunnerUtils
{
Expand All @@ -53,18 +53,12 @@ private PrestoNativeQueryRunnerUtils() {}
public static QueryRunner createQueryRunner()
throws Exception
{
String prestoServerPath = System.getenv("PRESTO_SERVER");
String dataDirectory = System.getenv("DATA_DIR");
String workerCount = System.getenv("WORKER_COUNT");
int cacheMaxSize = 4096; // 4GB size cache

checkArgument(prestoServerPath != null, "Native worker binary path is missing. Add PRESTO_SERVER environment variable.");
checkArgument(dataDirectory != null, "Data directory path is missing.. Add DATA_DIR environment variable.");

NativeQueryRunnerParameters nativeQueryRunnerParameters = getNativeQueryRunnerParameters();
return createQueryRunner(
Optional.ofNullable(prestoServerPath),
Optional.ofNullable(dataDirectory).map(Paths::get),
Optional.ofNullable(workerCount).map(Integer::parseInt),
Optional.of(nativeQueryRunnerParameters.serverBinary.toString()),
Optional.of(nativeQueryRunnerParameters.dataDirectory),
nativeQueryRunnerParameters.workerCount,
cacheMaxSize,
DEFAULT_STORAGE_FORMAT);
}
Expand All @@ -77,10 +71,6 @@ public static QueryRunner createQueryRunner(
String storageFormat)
throws Exception
{
if (prestoServerPath.isPresent()) {
checkArgument(dataDirectory.isPresent(), "Path to data files must be specified when testing external workers");
}

QueryRunner defaultQueryRunner = createJavaQueryRunner(dataDirectory, storageFormat);

if (!prestoServerPath.isPresent()) {
Expand All @@ -92,15 +82,16 @@ public static QueryRunner createQueryRunner(
return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat);
}

public static QueryRunner createJavaQueryRunner() throws Exception
public static QueryRunner createJavaQueryRunner()
throws Exception
{
return createJavaQueryRunner(DEFAULT_STORAGE_FORMAT);
}

public static QueryRunner createJavaQueryRunner(String storageFormat) throws Exception
public static QueryRunner createJavaQueryRunner(String storageFormat)
throws Exception
{
String dataDirectory = System.getProperty("DATA_DIR");
return createJavaQueryRunner(Optional.of(Paths.get(dataDirectory)), storageFormat);
return createJavaQueryRunner(Optional.of(getNativeQueryRunnerParameters().dataDirectory), storageFormat);
}

public static QueryRunner createJavaQueryRunner(Optional<Path> dataDirectory, String storageFormat)
Expand Down Expand Up @@ -193,8 +184,8 @@ public static QueryRunner createNativeQueryRunner(
if (cacheMaxSize > 0) {
Files.write(catalogDirectoryPath.resolve("hive.properties"),
format("connector.name=hive%n" +
"cache.enabled=true%n" +
"cache.max-cache-size=%s", cacheMaxSize).getBytes());
"cache.enabled=true%n" +
"cache.max-cache-size=%s", cacheMaxSize).getBytes());
}
else {
Files.write(catalogDirectoryPath.resolve("hive.properties"),
Expand Down Expand Up @@ -245,15 +236,9 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift, String stor
public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional<String> remoteFunctionServerUds)
throws Exception
{
String prestoServerPath = System.getProperty("PRESTO_SERVER");
String dataDirectory = System.getProperty("DATA_DIR");
String workerCount = System.getProperty("WORKER_COUNT");
int cacheMaxSize = 0;

assertNotNull(prestoServerPath, "Native worker binary path is missing. Add -DPRESTO_SERVER=<path/to/presto_server> to your JVM arguments.");
assertNotNull(dataDirectory, "Data directory path is missing. Add -DDATA_DIR=<path/to/data> to your JVM arguments.");

return createNativeQueryRunner(dataDirectory, prestoServerPath, Optional.ofNullable(workerCount).map(Integer::parseInt), cacheMaxSize, useThrift, remoteFunctionServerUds, storageFormat);
NativeQueryRunnerParameters nativeQueryRunnerParameters = getNativeQueryRunnerParameters();
return createNativeQueryRunner(nativeQueryRunnerParameters.dataDirectory.toString(), nativeQueryRunnerParameters.serverBinary.toString(), nativeQueryRunnerParameters.workerCount, cacheMaxSize, useThrift, remoteFunctionServerUds, storageFormat);
}

// Start the remote function server. Return the UDS path used to communicate with it.
Expand All @@ -277,6 +262,43 @@ public static String startRemoteFunctionServer(String remoteFunctionServerBinary
}
}

public static NativeQueryRunnerParameters getNativeQueryRunnerParameters()
{
Path prestoServerPath = Paths.get(Optional.ofNullable(System.getProperty("PRESTO_SERVER"))
.orElse("_build/debug/presto_cpp/main/presto_server"))
.toAbsolutePath();
Path dataDirectory = Paths.get(Optional.ofNullable(System.getProperty("DATA_DIR"))
.orElse("target/velox_data"))
.toAbsolutePath();
Optional<Integer> workerCount = Optional.ofNullable(System.getProperty("WORKER_COUNT")).map(Integer::parseInt);

assertTrue(Files.exists(prestoServerPath), format("Native worker binary at %s not found. Add -DPRESTO_SERVER=<path/to/presto_server> to your JVM arguments.", prestoServerPath));
log.info("Using PRESTO_SERVER binary at %s", prestoServerPath);

if (!Files.exists(dataDirectory)) {
assertTrue(dataDirectory.toFile().mkdirs());
}

assertTrue(Files.exists(dataDirectory), format("Data directory at %s is missing. Add -DDATA_DIR=<path/to/data> to your JVM arguments to specify the path", dataDirectory));
log.info("using DATA_DIR at %s", dataDirectory);

return new NativeQueryRunnerParameters(prestoServerPath, dataDirectory, workerCount);
}

public static class NativeQueryRunnerParameters
{
public final Path serverBinary;
public final Path dataDirectory;
public final Optional<Integer> workerCount;

public NativeQueryRunnerParameters(Path serverBinary, Path dataDirectory, Optional<Integer> workerCount)
{
this.serverBinary = requireNonNull(serverBinary, "serverBinary is null");
this.dataDirectory = requireNonNull(dataDirectory, "dataDirectory is null");
this.workerCount = requireNonNull(workerCount, "workerCount is null");
}
}

public static void setupJsonFunctionNamespaceManager(QueryRunner queryRunner, String jsonFileName, String catalogName)
{
String jsonDefinitionPath = Resources.getResource(jsonFileName).getFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import static com.facebook.airlift.log.Level.WARN;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerHiveProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerSystemProperties;
import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.getNativeQueryRunnerParameters;
import static com.facebook.presto.spark.PrestoSparkQueryRunner.METASTORE_CONTEXT;
import static java.nio.file.Files.createTempDirectory;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

Expand Down Expand Up @@ -94,10 +94,7 @@ public static Map<String, String> getNativeExecutionSessionConfigs()
.put("spark.partition-count-auto-tune-enabled", "false");

if (System.getProperty("NATIVE_PORT") == null) {
String path = requireNonNull(System.getProperty("PRESTO_SERVER"),
"Native worker binary path is missing. " +
"Add -DPRESTO_SERVER=/path/to/native/process/bin to your JVM arguments.");
builder.put("native-execution-executable-path", path);
builder.put("native-execution-executable-path", getNativeQueryRunnerParameters().serverBinary.toString());
}

try {
Expand Down Expand Up @@ -211,7 +208,7 @@ public static synchronized Path getBaseDataPath()
return dataDirectory.get();
}
String dataDirectoryStr = System.getProperty("DATA_DIR");
if (dataDirectoryStr.isEmpty()) {
if (dataDirectoryStr == null || dataDirectoryStr.isEmpty()) {
try {
dataDirectory = Optional.of(createTempDirectory("PrestoTest").toAbsolutePath());
}
Expand All @@ -220,7 +217,7 @@ public static synchronized Path getBaseDataPath()
}
}
else {
dataDirectory = Optional.of(Paths.get(dataDirectoryStr));
dataDirectory = Optional.of(getNativeQueryRunnerParameters().dataDirectory);
}
return dataDirectory.get();
}
Expand Down

0 comments on commit c6292d0

Please sign in to comment.