Skip to content

Commit

Permalink
[native] Add Arrow Flight Connector
Browse files Browse the repository at this point in the history
The native Arrow Flight connector can be used to connect to any Arrow Flight
enabled Data Source. The metadata layer is handled by the Presto coordinator
and does not need to be re-implemented in C++. Any Java connector that inherits
from `presto-base-arrow-flight` can use this connector as it's counterpart for
the Prestissimo layer.

Different Arrow-Flight enabled data sources can differ in authentication styles.
A plugin-style interface is provided to handle such cases with custom
authentication code by extending `arrow_flight::auth::Authenticator`.

RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0004-arrow-flight-connector.md#prestissimo-implementation

Co-authored-by: Ashwin Kumar <[email protected]>
Co-authored-by: Rijin-N <[email protected]>
Co-authored-by: Nischay Yadav <[email protected]>
  • Loading branch information
3 people authored and BryanCutler committed Feb 5, 2025
1 parent 4464129 commit c9b9df2
Show file tree
Hide file tree
Showing 65 changed files with 3,444 additions and 14 deletions.
2 changes: 2 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR "Enable Arrow Flight connector" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ endif
ifneq ($(PRESTO_MEMORY_CHECKER_TYPE),)
EXTRA_CMAKE_FLAGS += -DPRESTO_MEMORY_CHECKER_TYPE=$(PRESTO_MEMORY_CHECKER_TYPE)
endif
ifneq ($(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR),)
EXTRA_CMAKE_FLAGS += -DPRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR=$(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
endif

CMAKE_FLAGS := -DTREAT_WARNINGS_AS_ERRORS=${TREAT_WARNINGS_AS_ERRORS}
CMAKE_FLAGS += -DENABLE_ALL_WARNINGS=${ENABLE_WALL}
Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ follow these steps:
* For development, use `make debug` to build a non-optimized debug version.
* Use `make unittest` to build and run tests.

#### Arrow Flight Connector
To enable Arrow Flight connector support, set the environment variable:
`PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR = "ON"`.

The Arrow Flight connector requires the Arrow Flight library. You can install this dependency
by running the following script from the `presto/presto-native-execution` directory:

`./scripts/setup-adapters.sh arrow_flight`

### Makefile Targets
A reminder of the available Makefile targets can be obtained using `make help`
```
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_subdirectory(types)
add_subdirectory(http)
add_subdirectory(common)
add_subdirectory(thrift)
add_subdirectory(connectors)

add_library(
presto_server_lib
Expand Down Expand Up @@ -48,6 +49,7 @@ target_link_libraries(
presto_common
presto_exception
presto_function_metadata
presto_connector
presto_http
presto_operators
presto_velox_conversion
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/connectors/ConnectorRegistration.h"
#include "presto_cpp/main/http/HttpConstants.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
Expand Down Expand Up @@ -280,6 +281,8 @@ void PrestoServer::run() {
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

presto::connector::registerAllPrestoConnectors();

velox::exec::OutputBufferManager::initialize({});
initializeVeloxMemory();
initializeThreadPools();
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/SystemConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ std::unique_ptr<velox::connector::ConnectorSplit>
SystemPrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* const connectorSplit,
const protocol::SplitContext* splitContext) const {
const protocol::SplitContext* splitContext,
const std::map<std::string, std::string>& extraCredentials) const {
auto systemSplit = dynamic_cast<const protocol::SystemSplit*>(connectorSplit);
VELOX_CHECK_NOT_NULL(
systemSplit, "Unexpected split type {}", connectorSplit->_type);
Expand Down
4 changes: 3 additions & 1 deletion presto-native-execution/presto_cpp/main/SystemConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ class SystemPrestoToVeloxConnector final : public PrestoToVeloxConnector {
std::unique_ptr<velox::connector::ConnectorSplit> toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const final;
const protocol::SplitContext* splitContext,
const std::map<std::string, std::string>& extraCredentials = {})
const final;

std::unique_ptr<velox::connector::ColumnHandle> toVeloxColumnHandle(
const protocol::ColumnHandle* column,
Expand Down
5 changes: 4 additions & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateTask(
updateRequest.sources,
updateRequest.outputIds,
summarize,
updateRequest.extraCredentials,
std::move(queryCtx),
startProcessCpuTime);
}
Expand All @@ -493,6 +494,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateBatchTask(
updateRequest.sources,
updateRequest.outputIds,
summarize,
updateRequest.extraCredentials,
std::move(queryCtx),
startProcessCpuTime);
}
Expand All @@ -503,6 +505,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
const std::vector<protocol::TaskSource>& sources,
const protocol::OutputBuffers& outputBuffers,
bool summarize,
const std::map<std::string, std::string>& extraCredentials,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime) {
std::shared_ptr<exec::Task> execTask;
Expand Down Expand Up @@ -606,7 +609,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
// Keep track of the max sequence for this batch of splits.
long maxSplitSequenceId{-1};
for (const auto& protocolSplit : source.splits) {
auto split = toVeloxSplit(protocolSplit);
auto split = toVeloxSplit(protocolSplit, extraCredentials);
if (split.hasConnectorSplit()) {
maxSplitSequenceId =
std::max(maxSplitSequenceId, protocolSplit.sequenceId);
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class TaskManager {
const std::vector<protocol::TaskSource>& sources,
const protocol::OutputBuffers& outputBuffers,
bool summarize,
const std::map<std::string, std::string>& extraCredentials,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime);

Expand Down
20 changes: 20 additions & 0 deletions presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
add_subdirectory(arrow_flight)
endif()

add_library(presto_connector ConnectorRegistration.cpp)

if(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
target_link_libraries(presto_connector presto_flight_connector)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/connectors/ConnectorRegistration.h"

#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
#include "presto_cpp/main/connectors/arrow_flight/ArrowFlightConnector.h"
#include "presto_cpp/main/connectors/arrow_flight/ArrowPrestoToVeloxConnector.h"
#endif

namespace facebook::presto::connector {

void registerAllPrestoConnectors() {
#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
registerPrestoToVeloxConnector(
std::make_unique<
presto::connector::arrow_flight::ArrowPrestoToVeloxConnector>(
"arrow-flight"));

if (!velox::connector::hasConnectorFactory(
presto::connector::arrow_flight::ArrowFlightConnectorFactory::
kArrowFlightConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<
presto::connector::arrow_flight::ArrowFlightConnectorFactory>());
}
#endif
}

} // namespace facebook::presto::connector
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

namespace facebook::presto::connector {

void registerAllPrestoConnectors();

} // namespace facebook::presto::connector
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/connectors/arrow_flight/ArrowFlightConnector.h"

#include <utility>
#include "arrow/c/abi.h"
#include "arrow/c/bridge.h"
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/connectors/arrow_flight/Macros.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::presto::connector::arrow_flight {

using namespace arrow::flight;
using namespace velox;
using namespace velox::connector;

// Wrapper for CallOptions which does not add any member variables,
// but provides a write-only interface for adding call headers.
class CallOptionsAddHeaders : public FlightCallOptions, public AddCallHeaders {
public:
void AddHeader(const std::string& key, const std::string& value) override {
headers.emplace_back(key, value);
}
};

std::optional<Location> ArrowFlightConnector::getDefaultLocation(
const std::shared_ptr<FlightConfig>& config) {
auto defaultHost = config->defaultServerHostname();
auto defaultPort = config->defaultServerPort();
if (!defaultHost.has_value() || !defaultPort.has_value()) {
return std::nullopt;
}

bool defaultSslEnabled = config->defaultServerSslEnabled();
AFC_RETURN_OR_RAISE(
defaultSslEnabled
? Location::ForGrpcTls(defaultHost.value(), defaultPort.value())
: Location::ForGrpcTcp(defaultHost.value(), defaultPort.value()));
}

std::shared_ptr<arrow::flight::FlightClientOptions>
ArrowFlightConnector::initClientOpts(
const std::shared_ptr<FlightConfig>& config) {
auto clientOpts = std::make_shared<FlightClientOptions>();
clientOpts->disable_server_verification = !config->serverVerify();

auto certPath = config->serverSslCertificate();
if (certPath.has_value()) {
std::ifstream file(certPath.value());
VELOX_CHECK(file.is_open(), "Could not open TLS certificate");
std::string cert(
(std::istreambuf_iterator<char>(file)),
(std::istreambuf_iterator<char>()));
clientOpts->tls_root_certs = cert;
}

return clientOpts;
}

FlightDataSource::FlightDataSource(
const RowTypePtr& outputType,
const std::unordered_map<std::string, std::shared_ptr<ColumnHandle>>&
columnHandles,
std::shared_ptr<auth::Authenticator> authenticator,
memory::MemoryPool* pool,
const std::shared_ptr<FlightConfig>& flightConfig,
const std::shared_ptr<arrow::flight::FlightClientOptions>& clientOpts,
const std::optional<arrow::flight::Location>& defaultLocation)
: outputType_{outputType},
authenticator_{std::move(authenticator)},
pool_{pool},
flightConfig_{flightConfig},
clientOpts_{clientOpts},
defaultLocation_{defaultLocation} {
// columnMapping_ contains the real column names in the expected order.
// This is later used by projectOutputColumns to filter out unnecessary
// columns from the fetched chunk.
columnMapping_.reserve(outputType_->size());

for (const auto& columnName : outputType_->names()) {
auto it = columnHandles.find(columnName);
VELOX_CHECK(
it != columnHandles.end(),
"missing columnHandle for column '{}'",
columnName);

auto handle = std::dynamic_pointer_cast<FlightColumnHandle>(it->second);
VELOX_CHECK_NOT_NULL(
handle,
"handle for column '{}' is not an FlightColumnHandle",
columnName);

columnMapping_.push_back(handle->name());
}
}

void FlightDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
auto flightSplit = std::dynamic_pointer_cast<FlightSplit>(split);
VELOX_CHECK(flightSplit, "FlightDataSource received wrong type of split");

FlightEndpoint flightEndpoint;
AFC_ASSIGN_OR_RAISE(
flightEndpoint,
arrow::flight::FlightEndpoint::Deserialize(
flightSplit->flightEndpointBytes));

Location loc;
if (!flightEndpoint.locations.empty()) {
loc = flightEndpoint.locations[0];
} else {
VELOX_CHECK(
defaultLocation_.has_value(),
"Split has empty Location list, but default host or port is missing");
loc = defaultLocation_.value();
}
VELOX_CHECK_NOT_NULL(clientOpts_, "FlightClientOptions is not initialized");

AFC_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(loc, *clientOpts_));

CallOptionsAddHeaders callOptsAddHeaders{};
authenticator_->authenticateClient(
client, flightSplit->extraCredentials, callOptsAddHeaders);

auto readerResult = client->DoGet(callOptsAddHeaders, flightEndpoint.ticket);
AFC_ASSIGN_OR_RAISE(currentReader_, readerResult);
}

std::optional<RowVectorPtr> FlightDataSource::next(
uint64_t size,
velox::ContinueFuture& /* unused */) {
VELOX_CHECK_NOT_NULL(currentReader_, "Missing split, call addSplit() first");

AFC_ASSIGN_OR_RAISE(auto chunk, currentReader_->Next());

// Null values in the chunk indicates that the Flight stream is complete.
if (!chunk.data) {
currentReader_ = nullptr;
return nullptr;
}

// Extract only required columns from the record batch as a velox RowVector.
auto output = projectOutputColumns(chunk.data);

completedRows_ += output->size();
completedBytes_ += output->inMemoryBytes();
return output;
}

RowVectorPtr FlightDataSource::projectOutputColumns(
const std::shared_ptr<arrow::RecordBatch>& input) {
std::vector<VectorPtr> children;
children.reserve(columnMapping_.size());

// Extract and convert desired columns in the correct order.
for (const auto& name : columnMapping_) {
auto column = input->GetColumnByName(name);
VELOX_CHECK_NOT_NULL(column, "column with name '{}' not found", name);
ArrowArray array;
ArrowSchema schema;
AFC_RAISE_NOT_OK(arrow::ExportArray(*column, &array, &schema));
children.push_back(importFromArrowAsOwner(schema, array, pool_));
}

return std::make_shared<RowVector>(
pool_,
outputType_,
BufferPtr() /*nulls*/,
input->num_rows(),
std::move(children));
}

} // namespace facebook::presto::connector::arrow_flight
Loading

0 comments on commit c9b9df2

Please sign in to comment.