Skip to content

Commit

Permalink
Add Exchange source for unsaferow blocks
Browse files Browse the repository at this point in the history
Also an end2end test simulating an external shuffle service
for unsaferow blocks.
  • Loading branch information
beroyfb authored and spershin committed Sep 21, 2022
1 parent 0486076 commit a6e84c0
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
add_library(
presto_operators
PartitionAndSerialize.cpp
ShuffleWrite.cpp)
ShuffleWrite.cpp
UnsafeRowExchangeSource.cpp)

target_link_libraries(
presto_operators
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"

namespace facebook::presto::operators {

void UnsafeRowExchangeSource::request() {
std::lock_guard<std::mutex> l(queue_->mutex());

if (!shuffle_->hasNext(destination_)) {
atEnd_ = true;
queue_->enqueue(nullptr);
return;
}

auto buffer = shuffle_->next(destination_, true);

auto ioBuf = folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size());
queue_->enqueue(std::make_unique<velox::exec::SerializedPage>(
std::move(ioBuf), pool_, [buffer](auto&) { buffer->release(); }));
}
}; // namespace facebook::presto::operators
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "presto_cpp/main/operators/ShuffleWrite.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/Operator.h"

namespace facebook::presto::operators {

class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {
public:
UnsafeRowExchangeSource(
const std::string& taskId,
int destination,
std::shared_ptr<velox::exec::ExchangeQueue> queue,
ShuffleInterface* shuffle,
velox::memory::MemoryPool* pool)
: ExchangeSource(taskId, destination, queue, pool), shuffle_(shuffle) {}

bool shouldRequestLocked() override {
return !atEnd_;
}

void request() override;

void close() override {}

private:
ShuffleInterface* shuffle_;
};
} // namespace facebook::presto::operators
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "folly/init/Init.h"
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/operators/ShuffleWrite.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/tests/utils/OperatorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
Expand Down Expand Up @@ -108,6 +109,22 @@ class TestShuffle : public ShuffleInterface {
std::vector<std::vector<BufferPtr>> readyPartitions_;
};

void registerExchangeSource(ShuffleInterface* shuffle) {
exec::ExchangeSource::registerFactory(
[shuffle](
const std::string& taskId,
int destination,
std::shared_ptr<exec::ExchangeQueue> queue,
memory::MemoryPool* FOLLY_NONNULL pool)
-> std::unique_ptr<exec::ExchangeSource> {
if (strncmp(taskId.c_str(), "spark://", 8) == 0) {
return std::make_unique<UnsafeRowExchangeSource>(
taskId, destination, std::move(queue), shuffle, pool);
}
return nullptr;
});
}

auto addPartitionAndSerializeNode(int numPartitions) {
return [numPartitions](
core::PlanNodeId nodeId,
Expand Down Expand Up @@ -136,6 +153,10 @@ auto addShuffleWriteNode(ShuffleInterface* shuffle) {
class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
protected:

void registerVectorSerde() override {
serializer::spark::UnsafeRowVectorSerde::registerVectorSerde();
}

static std::string makeTaskId(const std::string& prefix, int num) {
return fmt::format("spark://{}-{}", prefix, num);
}
Expand Down Expand Up @@ -212,7 +233,6 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
serializer->deserialize(input.get(), pool(), rowType, &result, nullptr);
return result;
}

};

TEST_F(UnsafeRowShuffleTest, operators) {
Expand Down Expand Up @@ -243,6 +263,61 @@ TEST_F(UnsafeRowShuffleTest, operators) {
ASSERT_EQ(serializedResults.size(), 0);
}

TEST_F(UnsafeRowShuffleTest, endToEnd) {
exec::Operator::registerOperator(
std::make_unique<PartitionAndSerializeTranslator>());
exec::Operator::registerOperator(std::make_unique<ShuffleWriteTranslator>());

size_t numPartitions = 5;
TestShuffle shuffle(pool(), numPartitions, 1 << 20 /* 1MB */);

registerExchangeSource(&shuffle);

// Create and run single leaf task to partition data and write it to shuffle.
auto data = makeRowVector({
makeFlatVector<int32_t>({1, 2, 3, 4, 5, 6}),
makeFlatVector<int64_t>({10, 20, 30, 40, 50, 60}),
});

auto dataType = asRowType(data->type());

auto leafPlan = exec::test::PlanBuilder()
.values({data}, true)
.addNode(addPartitionAndSerializeNode(numPartitions))
.localPartition({})
.addNode(addShuffleWriteNode(&shuffle))
.planNode();

auto leafTaskId = makeTaskId("leaf", 0);
auto leafTask = makeTask(leafTaskId, leafPlan, 0);
exec::Task::start(leafTask, 2);
ASSERT_TRUE(exec::test::waitForTaskCompletion(leafTask.get()));

// Create and run multiple downstream tasks, one per partition, to read data
// from shuffle.
for (auto i = 0; i < numPartitions; ++i) {
auto plan = exec::test::PlanBuilder()
.exchange(dataType)
.project(dataType->names())
.planNode();

exec::test::CursorParameters params;
params.planNode = plan;
params.destination = i;

bool noMoreSplits = false;
auto [taskCursor, serializedResults] = readCursor(params, [&](auto* task) {
if (noMoreSplits) {
return;
}
addRemoteSplits(task, {leafTaskId});
noMoreSplits = true;
});

ASSERT_FALSE(shuffle.hasNext(i)) << i;
}
}

TEST_F(UnsafeRowShuffleTest, partitionAndSerializeOperator) {
auto data = makeRowVector({
makeFlatVector<int32_t>(1'000, [](auto row) { return row; }),
Expand Down Expand Up @@ -299,7 +374,7 @@ TEST_F(UnsafeRowShuffleTest, ShuffleWriterToString) {
}

TEST_F(UnsafeRowShuffleTest, partitionAndSerializeToString) {
auto data = vectorMaker_.rowVector({
auto data = makeRowVector({
makeFlatVector<int32_t>(1'000, [](auto row) { return row; }),
makeFlatVector<int64_t>(1'000, [](auto row) { return row * 10; }),
});
Expand Down

0 comments on commit a6e84c0

Please sign in to comment.