Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] Operators for unsaferow shuffle #18327

Merged
merged 3 commits into from
Sep 21, 2022

Conversation

beroyfb
Copy link
Contributor

@beroyfb beroyfb commented Sep 12, 2022

The new operators include:

  1. PartitionAndSerialize which partitions
    and also serializes rows in the row vector as a vector of string (each string holding a serialized row). It uses hive partition function. Later we may add murmur3 hash as well.
  2. ShuffleWrite which writes the result of serialized vectors into an external shuffle system
  3. Extension of ExchangeSource that receives serialized unsaferow blocks and outputs row vectors after deserialization
  4. An interface for an external shuffle

Test plan - presto_cpp/main/operators/tests/UnsaferowShuffleTest.cpp is an end-2-end test including a basic shuffle and all the new operators

== NO RELEASE NOTE ==

@beroyfb beroyfb requested a review from a team as a code owner September 12, 2022 05:21
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Sep 12, 2022

CLA Signed

The committers listed above are authorized under a signed CLA.

  • ✅ login: beroyfb (4d81db3a5c57397247069fce2b452bf0036b4270)

@beroyfb beroyfb force-pushed the sapphire-shuffle-operators branch 2 times, most recently from d94ec59 to 4d81db3 Compare September 12, 2022 18:08
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beroyfb Looks good overall. This is a large change with many separate pieces. It would be nice to break it up into multiple commits. Perhaps,

  • Add PartitionAndSerialize operator.
  • Add UnsafeRowExchangeSource.
  • Add ShuffleInterface.
  • Add ShuffleWrite operator.


// Initializing hive partition function
auto numPartitions = planNode->numPartitions();
VELOX_CHECK(!planNode->keys().empty(), "Empty keys for hive hash!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, move this check to plan node constructor to fail earlier.

auto inputType = planNode->sources()[0]->outputType();
auto keyChannels = toChannels(inputType, planNode->keys());

// Initializing hive partition function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments should be full sentences, e.g. start with a capital letter and end with a period. Please, fix throughout the PR.

// Write to the shuffle one row at a time
virtual void collect(int32_t partition, std::string_view data) = 0;
// Tell the shuffle system the writer is done
virtual void noMoreData() = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to pass a boolean flag indicating whether we are closing normally or aborting on error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. You are right.

public:
// Write to the shuffle one row at a time
virtual void collect(int32_t partition, std::string_view data) = 0;
// Tell the shuffle system the writer is done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty lines between methods.

}
} // namespace

class UnsaferowShuffleTest : public exec::test::OperatorTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Unsaferow -> UnsafeRow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still needs to be fixed.

}

for (auto& result : serializedResults) {
std::cout << "Partition: " << i << ": " << result->toString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove print statements from the tests.


namespace facebook::presto::operators {

class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in the file name: Unsaferow -> UnsafeRow.

auto plan =
exec::test::PlanBuilder()
.values({data}, true)
.addNode(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use addPartitionAndSerializeNode

return rowVec;
}

void assertEqualVectors(const VectorPtr& expected, const VectorPtr& actual) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the existing function from velox/vector/tests/VectorTestBase.h ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried but given that it doesn't seem to be reference-able in presto_cpp

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem to be reference-able in presto_cpp

Hmm... what does that mean? What happens if you try using this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body of assertEqualVectors is located under VeloxVectorTest.cpp which is under velox_vector_test_lib. When I add that lib to CMake file I get a link error:

ld: library not found for -lvelox_vector_test_lib
clang: error: linker command failed with exit code 1 (use -v to see invocation)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha... This needs to be fixed.

In Velox, there are 2 build variables that control what test code is built: VELOX_BUILD_TESTING and VELOX_BUILD_TEST_UTILS. VELOX_BUILD_TESTING is intended to allow for disabling building of tests (to speed up builds). VELOX_BUILD_TEST_UTILS is intended to allow for disabling building test utilities.

VectorTestBase.* files are test utilities, however, their build is guarded by VELOX_BUILD_TESTING, not VELOX_BUILD_TEST_UTILS.

presto-native-execution/velox/velox/vector/CMakeLists.txt

if(${VELOX_BUILD_TESTING} OR ${VELOX_ENABLE_BENCHMARKS_BASIC})
  add_subdirectory(tests)
  add_subdirectory(benchmarks)
endif()

This is incorrect as we should check VELOX_BUILD_TEST_UTILS variable when deciding whether to build VectorTestBase.* files (velox_vector_test_lib).

This is not a problem with velox_exec_test_util that contains OperatorTestBase.* because that is being build even if VELOX_BUILD_TESTING is off, but VELOX_BUILD_TEST_UTILS is on.

presto-native-execution/velox/velox/exec/CMakeLists.txt

if(${VELOX_BUILD_TESTING})
  add_subdirectory(tests)
  add_subdirectory(benchmarks)
elseif(${VELOX_BUILD_TEST_UTILS})
  add_subdirectory(tests/utils)
endif()

When building Prestissimo, we set VELOX_BUILD_TESTING OFF and VELOX_BUILD_TEST_UTILS ON.

presto-native-execution/CMakeLists.txt

set(VELOX_BUILD_TESTING
    OFF
    CACHE BOOL "Enable Velox tests")

option(PRESTO_ENABLE_TESTING "Enable tests" ON)

set(VELOX_BUILD_TEST_UTILS
    ${PRESTO_ENABLE_TESTING}
    CACHE BOOL "Enable Velox test utils")

A fix could be to move to call add_subdirectory(tests) unconditionally in vector/CMakeLists.txt, then build velox_vector_test_lib if VELOX_BUILD_TEST_UTILS is ON and the rest of the tests if VELOX_BUILD_TESTING is ON. Alternatively, we could move VectorMaker.* and VectorTestBase.* into a separate directory and call add_directory for that if VELOX_BUILD_TEST_UTILS is ON.

CC: @majetideepak @kgpai

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova I tried the solution #1 and it worked! So I'm using vector maker and also assertEqualityVectors ... in this PR as well. Preparing a Velox PR for that change too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not needed. Please, remove.

return result;
}

std::shared_ptr<const RowType> rowType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and a few following methods appear not used. Would you double check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually only this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method still needs to be deleted.

@beroyfb beroyfb force-pushed the sapphire-shuffle-operators branch from 4d81db3 to a9c03ae Compare September 12, 2022 23:56
auto numPartitions = planNode->numPartitions();
VELOX_CHECK(!planNode->keys().empty(), "Empty keys for hive hash!");
std::vector<int> bucketToPartition(numPartitions);
for (int i = 0; i < numPartitions; ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider using std::iota for brevity

std::iota(bucketToPartition.begin(), bucketToPartition.end(), 0);

#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/exec/HashPartitionFunction.h"
#include "velox/expression/Expr.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this #include needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it.

@beroyfb beroyfb force-pushed the sapphire-shuffle-operators branch 2 times, most recently from 32efc6a to 387ca07 Compare September 13, 2022 16:52
std::vector<VectorPtr>{});
}

RowVectorPtr makeRowVector(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should not be needed. We should use the implementation from the base class. Then, 'rowVector' method won't be needed as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

@beroyfb beroyfb force-pushed the sapphire-shuffle-operators branch 3 times, most recently from ee9dad1 to 349a638 Compare September 14, 2022 06:10
@mbasmanova mbasmanova changed the title Operators for unsaferow shuffle [native] Operators for unsaferow shuffle Sep 14, 2022
@beroyfb beroyfb force-pushed the sapphire-shuffle-operators branch 4 times, most recently from 25396cc to 49299e0 Compare September 20, 2022 18:10
@beroyfb beroyfb requested a review from a team as a code owner September 20, 2022 18:10
@beroyfb beroyfb requested a review from ChunxuTang September 20, 2022 18:10
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beroyfb Behnam, thank you for iterating on this PR. Some comments for the first commit.

velox_hive_partition_function
velox_vector)

add_subdirectory(tests)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap this in if(PRESTO_ENABLE_TESTING)

velox::ROW(
{"partition", "data"}, {velox::INTEGER(), velox::VARBINARY()})
->equivalent(*outputType_));
VELOX_CHECK(!keys_.empty(), "Empty keys for hive hash!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VELOX_USER_CHECK + remove ! from the message (no need to scream at the user)

}
} // namespace

class UnsaferowShuffleTest : public exec::test::OperatorTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still needs to be fixed.

return result;
}

std::shared_ptr<const RowType> rowType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method still needs to be deleted.

return ROW(std::move(names), std::move(types));
}

RowVectorPtr rowVector(const std::vector<VectorPtr>& children) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should not be needed. It is available in the base class.

std::vector<VectorPtr>{});
}

RowVectorPtr makeRowVector(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

return rowVec;
}

void assertEqualVectors(const VectorPtr& expected, const VectorPtr& actual) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not needed. Please, remove.

}
}

TEST_F(UnsafeRowShuffleTest, partitionAndSerializeStringTest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming: partitionAndSerializeStringTest -> partitionAndSerializeToString

}

TEST_F(UnsafeRowShuffleTest, partitionAndSerializeStringTest) {
exec::Operator::registerOperator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed

.planNode();

ASSERT_EQ(
plan->toString(true, false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you also add a test case with toString(false, false) and toString(true, true)?

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2nd commit looks good % one request.

virtual void collect(int32_t partition, std::string_view data) = 0;

// Tell the shuffle system the writer is done.
// The input parameter success set to false indicate aborted client.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use @param

virtual bool hasNext(int32_t partition) const = 0;

// Read the next block of data for this partition.
virtual velox::BufferPtr next(int32_t partition) = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a method to tell the reader we are done reading (either successfully or not)?


namespace facebook::presto::operators {

class ShuffleWriteNode : public velox::core::PlanNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add a test for ShuffleWriteNode->toString()?

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remaining comments for the 3rd commit.

@@ -92,4 +92,5 @@ jobs:
./mvnw install ${MAVEN_FAST_INSTALL} -pl '!presto-docs,!presto-server,!presto-server-rpm'

- name: Run e2e tests
rm -rf /tmp/hive_data/tpch/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? If so, can you extract this into a separate commit ?

@@ -212,83 +232,17 @@ class UnsaferowShuffleTest : public exec::test::OperatorTestBase {
serializer->deserialize(input.get(), pool(), rowType, &result, nullptr);
return result;
}

std::shared_ptr<const RowType> rowType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you move these deletes to the first commit to avoid code churn?

exec::Operator::registerOperator(
std::make_unique<PartitionAndSerializeTranslator>());

auto data = makeRowVector({
auto data = vectorMaker_.rowVector({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be possible to use makeRowVector here. Is this not working?

@beroyfb beroyfb force-pushed the sapphire-shuffle-operators branch from 49299e0 to 8fd55a8 Compare September 21, 2022 01:41
@beroyfb beroyfb force-pushed the sapphire-shuffle-operators branch from 8fd55a8 to 6068e73 Compare September 21, 2022 06:17
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beroyfb Looks good to me. Thank you for working on adding shuffle support to Sapphire on Velox.

PartitionAndSerialize partitions and also serializes rows in the row
vector as a vector of string (each string holding a serialized row).
It uses hive partition function. Later we may add murmur3 hash as well.
ShuffleWrite writes the result of serialized vectors into an external shuffle system.
Also adding external shuffle interface.
Also an end2end test simulating an external shuffle service
for unsaferow blocks.
@beroyfb beroyfb force-pushed the sapphire-shuffle-operators branch from 6068e73 to 6c51998 Compare September 21, 2022 14:16
@spershin spershin merged commit a6e84c0 into prestodb:master Sep 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants