Skip to content

Commit

Permalink
[native] Add serde parameter to remote function server configuration
Browse files Browse the repository at this point in the history
Make the serde method used for communication with remote function server
configurable through the config parameter
"remote-function-server.serde".
  • Loading branch information
pedroerp authored and arhimondr committed Nov 8, 2023
1 parent d35917c commit 08b771c
Showing 7 changed files with 55 additions and 9 deletions.
10 changes: 10 additions & 0 deletions presto-docs/src/main/sphinx/develop/presto-native.rst
Original file line number Diff line number Diff line change
@@ -76,6 +76,16 @@ The following properties allow the configuration of remote function execution:

If empty, the function is registered as ``schema.function_name``.

``remote-function-server.serde``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``"presto_page"``

The serialization/deserialization method to use when communicating with
the remote function server. Supported values are ``presto_page`` or
``spark_unsafe_row``.

``remote-function-server.thrift.address``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

6 changes: 4 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
@@ -813,9 +813,11 @@ void PrestoServer::registerRemoteFunctions() {
PRESTO_STARTUP_LOG(INFO)
<< "Registering remote functions from path: " << *dirPath;
if (auto remoteLocation = systemConfig->remoteFunctionServerLocation()) {
auto catalogName = systemConfig->remoteFunctionServerCatalogName();
const auto catalogName = systemConfig->remoteFunctionServerCatalogName();
const auto serdeName = systemConfig->remoteFunctionServerSerde();
size_t registeredCount = presto::registerRemoteFunctions(
*dirPath, *remoteLocation, catalogName);
*dirPath, *remoteLocation, catalogName, serdeName);

PRESTO_STARTUP_LOG(INFO)
<< registeredCount << " remote functions registered in the '"
<< catalogName << "' catalog.";
Original file line number Diff line number Diff line change
@@ -22,30 +22,44 @@
namespace facebook::presto {
namespace {

using velox::functions::remote::PageFormat;

std::string genFunctionName(
const std::string& baseFunctionName,
const std::string& schemaName,
const std::string& prefix) {
const std::string_view& prefix) {
auto name = schemaName.empty()
? baseFunctionName
: fmt::format("{}.{}", schemaName, baseFunctionName);

return prefix.empty() ? name : fmt::format("{}.{}", prefix, name);
}

PageFormat fromSerdeString(const std::string_view& serdeName) {
if (serdeName == "presto_page") {
return PageFormat::PRESTO_PAGE;
} else if (serdeName == "spark_unsafe_row") {
return PageFormat::SPARK_UNSAFE_ROW;
} else {
VELOX_FAIL("Unknown serde name for remote function server: '{}'", serdeName)
}
}

// Reads file at `filePath`, decodes the json signatures and registers them as
// remote functions pointing to `location`. Returns the number of signatures
// registered.
size_t processFile(
const fs::path& filePath,
const folly::SocketAddress& location,
const std::string& prefix) {
const std::string_view& prefix,
const std::string_view& serde) {
std::ifstream stream{filePath};
std::stringstream buffer;
buffer << stream.rdbuf();

velox::functions::RemoteVectorFunctionMetadata metadata;
metadata.location = location;
metadata.serdeFormat = fromSerdeString(serde);

// First group possible functions with the same name but different
// schemas.
@@ -75,18 +89,19 @@ size_t processFile(
size_t registerRemoteFunctions(
const std::string& inputPath,
const folly::SocketAddress& location,
const std::string& prefix) {
const std::string_view& prefix,
const std::string_view& serde) {
size_t signaturesCount = 0;
const fs::path path{inputPath};

if (fs::is_directory(path)) {
for (auto& entryPath : fs::recursive_directory_iterator(path)) {
if (entryPath.is_regular_file()) {
signaturesCount += processFile(entryPath, location, prefix);
signaturesCount += processFile(entryPath, location, prefix, serde);
}
}
} else if (fs::is_regular_file(path)) {
signaturesCount += processFile(path, location, prefix);
signaturesCount += processFile(path, location, prefix, serde);
}
return signaturesCount;
}
Original file line number Diff line number Diff line change
@@ -29,10 +29,15 @@ namespace facebook::presto {
/// `prefix`, if not empty, it is added to the names of functions registered
/// using '.' as a separator, e.g., 'prefix.functionName'.
///
/// `serde` controls the serialization/deserialization format to be used when
/// communicating with the remote server. "presto_page" and "spark_unsafe_row"
/// are supported formats.
///
/// Returns the number of signatures registered.
size_t registerRemoteFunctions(
const std::string& inputPath,
const folly::SocketAddress& location,
const std::string& prefix = "");
const std::string_view& prefix = "",
const std::string_view& serde = "presto_page");

} // namespace facebook::presto
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
@@ -170,6 +170,7 @@ SystemConfig::SystemConfig() {
NUM_PROP(kLocalShuffleMaxPartitionBytes, 268435456),
STR_PROP(kShuffleName, ""),
STR_PROP(kRemoteFunctionServerCatalogName, ""),
STR_PROP(kRemoteFunctionServerSerde, "presto_page"),
STR_PROP(kHttpEnableAccessLog, "false"),
STR_PROP(kHttpEnableStatsFilter, "false"),
STR_PROP(kHttpEnableEndpointLatencyFilter, "false"),
@@ -288,6 +289,10 @@ std::string SystemConfig::remoteFunctionServerCatalogName() const {
return optionalProperty(kRemoteFunctionServerCatalogName).value();
}

std::string SystemConfig::remoteFunctionServerSerde() const {
return optionalProperty(kRemoteFunctionServerSerde).value();
}

int32_t SystemConfig::maxDriversPerTask() const {
return optionalProperty<int32_t>(kMaxDriversPerTask).value();
}
10 changes: 9 additions & 1 deletion presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
@@ -337,10 +337,16 @@ class SystemConfig : public ConfigBase {
"remote-function-server.signature.files.directory.path"};

/// Optional catalog name to be added as a prefix to the function names
/// registered. The patter registered is `catalog.schema.function_name`.
/// registered. The pattern registered is `catalog.schema.function_name`.
static constexpr std::string_view kRemoteFunctionServerCatalogName{
"remote-function-server.catalog-name"};

/// Optional string containing the serialization/deserialization format to be
/// used when communicating with the remote server. Supported types are
/// "spark_unsafe_row" or "presto_page" ("presto_page" by default).
static constexpr std::string_view kRemoteFunctionServerSerde{
"remote-function-server.serde"};

/// Options to configure the internal (in-cluster) JWT authentication.
static constexpr std::string_view kInternalCommunicationJwtEnabled{
"internal-communication.jwt.enabled"};
@@ -403,6 +409,8 @@ class SystemConfig : public ConfigBase {

std::string remoteFunctionServerCatalogName() const;

std::string remoteFunctionServerSerde() const;

int32_t maxDriversPerTask() const;

int32_t concurrentLifespansPerTask() const;
Original file line number Diff line number Diff line change
@@ -184,6 +184,7 @@ public static QueryRunner createNativeQueryRunner(
configProperties = format("%s%n" +
"remote-function-server.catalog-name=%s%n" +
"remote-function-server.thrift.uds-path=%s%n" +
"remote-function-server.serde=presto_page%n" +
"remote-function-server.signature.files.directory.path=%s%n", configProperties, REMOTE_FUNCTION_CATALOG_NAME, remoteFunctionServerUds.get(), jsonSignaturesPath);
}
Files.write(tempDirectoryPath.resolve("config.properties"), configProperties.getBytes());

0 comments on commit 08b771c

Please sign in to comment.