-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[native] Operators for unsaferow shuffle #18327
Conversation
d94ec59
to
4d81db3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beroyfb Looks good overall. This is a large change with many separate pieces. It would be nice to break it up into multiple commits. Perhaps,
- Add PartitionAndSerialize operator.
- Add UnsafeRowExchangeSource.
- Add ShuffleInterface.
- Add ShuffleWrite operator.
|
||
// Initializing hive partition function | ||
auto numPartitions = planNode->numPartitions(); | ||
VELOX_CHECK(!planNode->keys().empty(), "Empty keys for hive hash!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, move this check to plan node constructor to fail earlier.
auto inputType = planNode->sources()[0]->outputType(); | ||
auto keyChannels = toChannels(inputType, planNode->keys()); | ||
|
||
// Initializing hive partition function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments should be full sentences, e.g. start with a capital letter and end with a period. Please, fix throughout the PR.
presto-native-execution/presto_cpp/main/operators/PartitionAndSerialize.cpp
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/PartitionAndSerialize.cpp
Show resolved
Hide resolved
// Write to the shuffle one row at a time | ||
virtual void collect(int32_t partition, std::string_view data) = 0; | ||
// Tell the shuffle system the writer is done | ||
virtual void noMoreData() = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to pass a boolean flag indicating whether we are closing normally or aborting on error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. You are right.
public: | ||
// Write to the shuffle one row at a time | ||
virtual void collect(int32_t partition, std::string_view data) = 0; | ||
// Tell the shuffle system the writer is done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add empty lines between methods.
} | ||
} // namespace | ||
|
||
class UnsaferowShuffleTest : public exec::test::OperatorTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: Unsaferow -> UnsafeRow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still needs to be fixed.
} | ||
|
||
for (auto& result : serializedResults) { | ||
std::cout << "Partition: " << i << ": " << result->toString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove print statements from the tests.
|
||
namespace facebook::presto::operators { | ||
|
||
class UnsafeRowExchangeSource : public velox::exec::ExchangeSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in the file name: Unsaferow -> UnsafeRow.
auto plan = | ||
exec::test::PlanBuilder() | ||
.values({data}, true) | ||
.addNode( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use addPartitionAndSerializeNode
return rowVec; | ||
} | ||
|
||
void assertEqualVectors(const VectorPtr& expected, const VectorPtr& actual) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the existing function from velox/vector/tests/VectorTestBase.h ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried but given that it doesn't seem to be reference-able in presto_cpp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't seem to be reference-able in presto_cpp
Hmm... what does that mean? What happens if you try using this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The body of assertEqualVectors
is located under VeloxVectorTest.cpp
which is under velox_vector_test_lib
. When I add that lib to CMake file I get a link error:
ld: library not found for -lvelox_vector_test_lib
clang: error: linker command failed with exit code 1 (use -v to see invocation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha... This needs to be fixed.
In Velox, there are 2 build variables that control what test code is built: VELOX_BUILD_TESTING and VELOX_BUILD_TEST_UTILS. VELOX_BUILD_TESTING is intended to allow for disabling building of tests (to speed up builds). VELOX_BUILD_TEST_UTILS is intended to allow for disabling building test utilities.
VectorTestBase.* files are test utilities, however, their build is guarded by VELOX_BUILD_TESTING, not VELOX_BUILD_TEST_UTILS.
presto-native-execution/velox/velox/vector/CMakeLists.txt
if(${VELOX_BUILD_TESTING} OR ${VELOX_ENABLE_BENCHMARKS_BASIC})
add_subdirectory(tests)
add_subdirectory(benchmarks)
endif()
This is incorrect as we should check VELOX_BUILD_TEST_UTILS variable when deciding whether to build VectorTestBase.* files (velox_vector_test_lib).
This is not a problem with velox_exec_test_util that contains OperatorTestBase.* because that is being build even if VELOX_BUILD_TESTING is off, but VELOX_BUILD_TEST_UTILS is on.
presto-native-execution/velox/velox/exec/CMakeLists.txt
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
add_subdirectory(benchmarks)
elseif(${VELOX_BUILD_TEST_UTILS})
add_subdirectory(tests/utils)
endif()
When building Prestissimo, we set VELOX_BUILD_TESTING OFF and VELOX_BUILD_TEST_UTILS ON.
presto-native-execution/CMakeLists.txt
set(VELOX_BUILD_TESTING
OFF
CACHE BOOL "Enable Velox tests")
option(PRESTO_ENABLE_TESTING "Enable tests" ON)
set(VELOX_BUILD_TEST_UTILS
${PRESTO_ENABLE_TESTING}
CACHE BOOL "Enable Velox test utils")
A fix could be to move to call add_subdirectory(tests)
unconditionally in vector/CMakeLists.txt, then build velox_vector_test_lib if VELOX_BUILD_TEST_UTILS is ON and the rest of the tests if VELOX_BUILD_TESTING is ON. Alternatively, we could move VectorMaker.* and VectorTestBase.* into a separate directory and call add_directory
for that if VELOX_BUILD_TEST_UTILS is ON.
CC: @majetideepak @kgpai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbasmanova I tried the solution #1 and it worked! So I'm using vector maker and also assertEqualityVectors ... in this PR as well. Preparing a Velox PR for that change too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not needed. Please, remove.
return result; | ||
} | ||
|
||
std::shared_ptr<const RowType> rowType( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method and a few following methods appear not used. Would you double check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually only this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method still needs to be deleted.
4d81db3
to
a9c03ae
Compare
auto numPartitions = planNode->numPartitions(); | ||
VELOX_CHECK(!planNode->keys().empty(), "Empty keys for hive hash!"); | ||
std::vector<int> bucketToPartition(numPartitions); | ||
for (int i = 0; i < numPartitions; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider using std::iota for brevity
std::iota(bucketToPartition.begin(), bucketToPartition.end(), 0);
#include "presto_cpp/main/operators/PartitionAndSerialize.h" | ||
#include "velox/connectors/hive/HivePartitionFunction.h" | ||
#include "velox/exec/HashPartitionFunction.h" | ||
#include "velox/expression/Expr.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this #include needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove it.
32efc6a
to
387ca07
Compare
std::vector<VectorPtr>{}); | ||
} | ||
|
||
RowVectorPtr makeRowVector( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should not be needed. We should use the implementation from the base class. Then, 'rowVector' method won't be needed as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
ee9dad1
to
349a638
Compare
25396cc
to
49299e0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beroyfb Behnam, thank you for iterating on this PR. Some comments for the first commit.
velox_hive_partition_function | ||
velox_vector) | ||
|
||
add_subdirectory(tests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap this in if(PRESTO_ENABLE_TESTING)
velox::ROW( | ||
{"partition", "data"}, {velox::INTEGER(), velox::VARBINARY()}) | ||
->equivalent(*outputType_)); | ||
VELOX_CHECK(!keys_.empty(), "Empty keys for hive hash!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VELOX_USER_CHECK + remove ! from the message (no need to scream at the user)
} | ||
} // namespace | ||
|
||
class UnsaferowShuffleTest : public exec::test::OperatorTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still needs to be fixed.
return result; | ||
} | ||
|
||
std::shared_ptr<const RowType> rowType( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method still needs to be deleted.
return ROW(std::move(names), std::move(types)); | ||
} | ||
|
||
RowVectorPtr rowVector(const std::vector<VectorPtr>& children) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should not be needed. It is available in the base class.
std::vector<VectorPtr>{}); | ||
} | ||
|
||
RowVectorPtr makeRowVector( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
return rowVec; | ||
} | ||
|
||
void assertEqualVectors(const VectorPtr& expected, const VectorPtr& actual) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not needed. Please, remove.
} | ||
} | ||
|
||
TEST_F(UnsafeRowShuffleTest, partitionAndSerializeStringTest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming: partitionAndSerializeStringTest -> partitionAndSerializeToString
} | ||
|
||
TEST_F(UnsafeRowShuffleTest, partitionAndSerializeStringTest) { | ||
exec::Operator::registerOperator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed
.planNode(); | ||
|
||
ASSERT_EQ( | ||
plan->toString(true, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you also add a test case with toString(false, false) and toString(true, true)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2nd commit looks good % one request.
virtual void collect(int32_t partition, std::string_view data) = 0; | ||
|
||
// Tell the shuffle system the writer is done. | ||
// The input parameter success set to false indicate aborted client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use @param
virtual bool hasNext(int32_t partition) const = 0; | ||
|
||
// Read the next block of data for this partition. | ||
virtual velox::BufferPtr next(int32_t partition) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a method to tell the reader we are done reading (either successfully or not)?
|
||
namespace facebook::presto::operators { | ||
|
||
class ShuffleWriteNode : public velox::core::PlanNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add a test for ShuffleWriteNode->toString()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remaining comments for the 3rd commit.
.github/workflows/test-native.yml
Outdated
@@ -92,4 +92,5 @@ jobs: | |||
./mvnw install ${MAVEN_FAST_INSTALL} -pl '!presto-docs,!presto-server,!presto-server-rpm' | |||
|
|||
- name: Run e2e tests | |||
rm -rf /tmp/hive_data/tpch/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? If so, can you extract this into a separate commit ?
@@ -212,83 +232,17 @@ class UnsaferowShuffleTest : public exec::test::OperatorTestBase { | |||
serializer->deserialize(input.get(), pool(), rowType, &result, nullptr); | |||
return result; | |||
} | |||
|
|||
std::shared_ptr<const RowType> rowType( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you move these deletes to the first commit to avoid code churn?
exec::Operator::registerOperator( | ||
std::make_unique<PartitionAndSerializeTranslator>()); | ||
|
||
auto data = makeRowVector({ | ||
auto data = vectorMaker_.rowVector({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be possible to use makeRowVector here. Is this not working?
49299e0
to
8fd55a8
Compare
8fd55a8
to
6068e73
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beroyfb Looks good to me. Thank you for working on adding shuffle support to Sapphire on Velox.
PartitionAndSerialize partitions and also serializes rows in the row vector as a vector of string (each string holding a serialized row). It uses hive partition function. Later we may add murmur3 hash as well.
ShuffleWrite writes the result of serialized vectors into an external shuffle system. Also adding external shuffle interface.
Also an end2end test simulating an external shuffle service for unsaferow blocks.
6068e73
to
6c51998
Compare
The new operators include:
and also serializes rows in the row vector as a vector of string (each string holding a serialized row). It uses hive partition function. Later we may add murmur3 hash as well.
Test plan - presto_cpp/main/operators/tests/UnsaferowShuffleTest.cpp is an end-2-end test including a basic shuffle and all the new operators
== NO RELEASE NOTE ==