From cef59b753cd7a85cf61f9ba20f601dcef57648f6 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Wed, 9 Oct 2024 22:10:37 -0500 Subject: [PATCH] Run TPC-H with cuDF (#24) * Enable cudf in TPC-H queries. * Draft work on strings conversion. * Improved DATE / logical type support. * Fix for join build to allow multiple batches. * Add environment variables VELOX_CUDF_DISABLED and VELOX_CUDF_DEBUG. * Apply tuning parameters to TPC-H queries to use larger batches when cuDF is enabled. * Add env vars for disabling cudf / enabling debug. * num_drivers=1, num_repeats=1 * Don't reset hashObject_ and only set finished_ to true if noMoreInput_ is true. * Apply tuning parameters to cuDF engine. --------- Co-authored-by: Karthikeyan Natarajan --- benchmark.sh | 25 +- velox/benchmarks/tpch/CMakeLists.txt | 1 + velox/benchmarks/tpch/TpchBenchmark.cpp | 13 + velox/experimental/cudf/exec/CMakeLists.txt | 3 +- velox/experimental/cudf/exec/CudfHashJoin.cpp | 255 +++++++++++++----- velox/experimental/cudf/exec/ToCudf.cpp | 80 ++++-- velox/experimental/cudf/exec/ToCudf.h | 3 + velox/experimental/cudf/exec/Utilities.cpp | 27 ++ velox/experimental/cudf/exec/Utilities.h | 23 ++ .../cudf/exec/VeloxCudfInterop.cpp | 235 +++++++++++----- .../experimental/cudf/tests/HashJoinTest.cpp | 11 +- 11 files changed, 499 insertions(+), 177 deletions(-) create mode 100644 velox/experimental/cudf/exec/Utilities.cpp create mode 100644 velox/experimental/cudf/exec/Utilities.h diff --git a/benchmark.sh b/benchmark.sh index 92083d949160..29d610e5e7ff 100755 --- a/benchmark.sh +++ b/benchmark.sh @@ -25,8 +25,29 @@ set -euo pipefail # Run a GPU build and test pushd "$(dirname ${0})" -#CUDA_ARCHITECTURES="native" EXTRA_CMAKE_FLAGS="-DVELOX_ENABLE_ARROW=ON -DVELOX_ENABLE_PARQUET=ON -DVELOX_ENABLE_BENCHMARKS=ON -DVELOX_ENABLE_BENCHMARKS_BASIC=ON" make gpu +mkdir -p benchmark_results -./_build/release/velox/benchmarks/tpch/velox_tpch_benchmark --data_path=velox-tpch-sf10-data --data_format=parquet --run_query_verbose=5 --num_repeats=6 +queries=${1:-$(seq 1 20)} +devices=${2:-"cpu gpu"} + + +for query_number in ${queries}; do + printf -v query_number '%02d' "${query_number}" + for device in ${devices}; do + case "${device}" in + "cpu") + num_drivers=40 + export VELOX_CUDF_DISABLED=1;; + "gpu") + num_drivers=1 + export VELOX_CUDF_DISABLED=0;; + esac + echo "Running query ${query_number} on ${device} with ${num_drivers} drivers." + # The benchmarks segfault after reporting results, so we disable errors + set +e + ./_build/release/velox/benchmarks/tpch/velox_tpch_benchmark --data_path=velox-tpch-sf10-data --data_format=parquet --run_query_verbose=${query_number} --num_repeats=1 --num_drivers ${num_drivers} 2>&1 | tee benchmark_results/q${query_number}_${device}_${num_drivers}_drivers + set -e + done +done popd diff --git a/velox/benchmarks/tpch/CMakeLists.txt b/velox/benchmarks/tpch/CMakeLists.txt index ba7491f7e603..74e2a9835c04 100644 --- a/velox/benchmarks/tpch/CMakeLists.txt +++ b/velox/benchmarks/tpch/CMakeLists.txt @@ -17,6 +17,7 @@ add_library(velox_tpch_benchmark_lib TpchBenchmark.cpp) target_link_libraries( velox_tpch_benchmark_lib velox_aggregates + velox_cudf_exec velox_exec velox_exec_test_lib velox_dwio_common diff --git a/velox/benchmarks/tpch/TpchBenchmark.cpp b/velox/benchmarks/tpch/TpchBenchmark.cpp index 413bd82e4a5d..0de04f0b7f8e 100644 --- a/velox/benchmarks/tpch/TpchBenchmark.cpp +++ b/velox/benchmarks/tpch/TpchBenchmark.cpp @@ -35,6 +35,7 @@ #include "velox/exec/Split.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/TpchQueryBuilder.h" +#include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/parse/TypeResolver.h" @@ -274,9 +275,13 @@ class TpchBenchmark { connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector(kHiveConnectorId, properties, ioExecutor_.get()); connector::registerConnector(hiveConnector); + + // Enable cuDF operators + cudf_velox::registerCudf(); } void shutdown() { + cudf_velox::unregisterCudf(); cache_->shutdown(); } @@ -290,6 +295,14 @@ class TpchBenchmark { params.planNode = tpchPlan.plan; params.queryConfigs[core::QueryConfig::kMaxSplitPreloadPerDriver] = std::to_string(FLAGS_split_preload_per_driver); + if (cudf_velox::cudfIsRegistered()) { + params.queryConfigs[core::QueryConfig::kPreferredOutputBatchBytes] = + "536870912"; // 512 MB + params.queryConfigs[core::QueryConfig::kPreferredOutputBatchRows] = + "500000"; + params.queryConfigs[core::QueryConfig::kMaxOutputBatchRows] = + "500000"; + } const int numSplitsPerFile = FLAGS_num_splits_per_file; bool noMoreSplits = false; diff --git a/velox/experimental/cudf/exec/CMakeLists.txt b/velox/experimental/cudf/exec/CMakeLists.txt index 4c9ea2efb482..6877a1117f5b 100644 --- a/velox/experimental/cudf/exec/CMakeLists.txt +++ b/velox/experimental/cudf/exec/CMakeLists.txt @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_cudf_exec CudfHashJoin.cpp ToCudf.cpp VeloxCudfInterop.cpp) +add_library(velox_cudf_exec CudfHashJoin.cpp ToCudf.cpp Utilities.cpp + VeloxCudfInterop.cpp) set_target_properties( velox_cudf_exec diff --git a/velox/experimental/cudf/exec/CudfHashJoin.cpp b/velox/experimental/cudf/exec/CudfHashJoin.cpp index 409c7db26da6..a1092207a08e 100644 --- a/velox/experimental/cudf/exec/CudfHashJoin.cpp +++ b/velox/experimental/cudf/exec/CudfHashJoin.cpp @@ -23,19 +23,24 @@ #include "velox/exec/Task.h" #include "velox/vector/ComplexVector.h" +#include #include #include +#include #include -#include "CudfHashJoin.h" -#include "VeloxCudfInterop.h" +#include "velox/experimental/cudf/exec/CudfHashJoin.h" +#include "velox/experimental/cudf/exec/Utilities.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" namespace facebook::velox::cudf_velox { void CudfHashJoinBridge::setHashTable( std::optional hashObject) { - std::cout << "Calling CudfHashJoinBridge::setHashTable" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBridge::setHashTable" << std::endl; + } std::vector promises; { std::lock_guard l(mutex_); @@ -50,19 +55,27 @@ void CudfHashJoinBridge::setHashTable( std::optional CudfHashJoinBridge::hashOrFuture( ContinueFuture* future) { - std::cout << "Calling CudfHashJoinBridge::hashOrFuture" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBridge::hashOrFuture" << std::endl; + } std::lock_guard l(mutex_); if (hashObject_.has_value()) { return std::move(hashObject_); } - std::cout << "Calling CudfHashJoinBridge::hashOrFuture constructing promise" - << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBridge::hashOrFuture constructing promise" + << std::endl; + } promises_.emplace_back("CudfHashJoinBridge::hashOrFuture"); - std::cout << "Calling CudfHashJoinBridge::hashOrFuture getSemiFuture" - << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBridge::hashOrFuture getSemiFuture" + << std::endl; + } *future = promises_.back().getSemiFuture(); - std::cout << "Calling CudfHashJoinBridge::hashOrFuture returning nullopt" - << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBridge::hashOrFuture returning nullopt" + << std::endl; + } return std::nullopt; } @@ -78,34 +91,39 @@ CudfHashJoinBuild::CudfHashJoinBuild( joinNode->id(), "CudfHashJoinBuild"), joinNode_(joinNode) { - std::cout << "CudfHashJoinBuild constructor" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "CudfHashJoinBuild constructor" << std::endl; + } } void CudfHashJoinBuild::addInput(RowVectorPtr input) { - std::cout << "Calling CudfHashJoinBuild::addInput" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBuild::addInput" << std::endl; + } // Queue inputs, process all at once. // TODO distribute work equally. - auto inputSize = input->size(); - if (inputSize > 0) { + if (input->size() > 0) { inputs_.push_back(std::move(input)); } } bool CudfHashJoinBuild::needsInput() const { - std::cout << "Calling CudfHashJoinBuild::needsInput" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBuild::needsInput" << std::endl; + } return !noMoreInput_; } RowVectorPtr CudfHashJoinBuild::getOutput() { - std::cout << "Calling CudfHashJoinBuild::getOutput" << std::endl; return nullptr; } void CudfHashJoinBuild::noMoreInput() { - std::cout << "Calling CudfHashJoinBuild::noMoreInput" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBuild::noMoreInput" << std::endl; + } NVTX3_FUNC_RANGE(); Operator::noMoreInput(); - // TODO std::vector promises; std::vector> peers; // Only last driver collects all answers @@ -120,11 +138,28 @@ void CudfHashJoinBuild::noMoreInput() { VELOX_CHECK(build); inputs_.insert(inputs_.end(), build->inputs_.begin(), build->inputs_.end()); } - // TODO build hash table - auto tbl = to_cudf_table(inputs_[0]); - std::cout << "Build table number of columns: " << tbl->num_columns() - << std::endl; - std::cout << "Build table number of rows: " << tbl->num_rows() << std::endl; + + auto cudf_tables = std::vector>(inputs_.size()); + auto cudf_table_views = std::vector(inputs_.size()); + for (int i = 0; i < inputs_.size(); i++) { + VELOX_CHECK_NOT_NULL(inputs_[i]); + cudf_tables[i] = to_cudf_table(inputs_[i]); + cudf_table_views[i] = cudf_tables[i]->view(); + } + auto tbl = cudf::concatenate(cudf_table_views); + + // Release input data + cudf::get_default_stream().synchronize(); + cudf_table_views.clear(); + cudf_tables.clear(); + inputs_.clear(); + + VELOX_CHECK_NOT_NULL(tbl); + if (cudfDebugEnabled()) { + std::cout << "Build table number of columns: " << tbl->num_columns() + << std::endl; + std::cout << "Build table number of rows: " << tbl->num_rows() << std::endl; + } auto buildType = joinNode_->sources()[1]->outputType(); auto buildKeys = joinNode_->rightKeys(); @@ -137,6 +172,14 @@ void CudfHashJoinBuild::noMoreInput() { auto hashObject = std::make_shared( tbl->view().select(build_key_indices), cudf::null_equality::EQUAL); + VELOX_CHECK_NOT_NULL(hashObject); + if (cudfDebugEnabled()) { + if (hashObject != nullptr) { + printf("hashObject is not nullptr %p\n", hashObject.get()); + } else { + printf("hashObject is *** nullptr\n"); + } + } // Copied peers.clear(); @@ -154,9 +197,13 @@ void CudfHashJoinBuild::noMoreInput() { } exec::BlockingReason CudfHashJoinBuild::isBlocked(ContinueFuture* future) { - std::cout << "Calling CudfHashJoinBuild::isBlocked" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBuild::isBlocked" << std::endl; + } if (!future_.valid()) { - std::cout << "CudfHashJoinBuild future is not valid" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "CudfHashJoinBuild future is not valid" << std::endl; + } return exec::BlockingReason::kNotBlocked; } *future = std::move(future_); @@ -164,7 +211,9 @@ exec::BlockingReason CudfHashJoinBuild::isBlocked(ContinueFuture* future) { } bool CudfHashJoinBuild::isFinished() { - std::cout << "Calling CudfHashJoinBuild::isFinished" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBuild::isFinished" << std::endl; + } return !future_.valid() && noMoreInput_; } @@ -179,20 +228,29 @@ CudfHashJoinProbe::CudfHashJoinProbe( joinNode->id(), "CudfHashJoinProbe"), joinNode_(joinNode) { - std::cout << "CudfHashJoinProbe constructor" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "CudfHashJoinProbe constructor" << std::endl; + } } bool CudfHashJoinProbe::needsInput() const { - std::cout << "Calling CudfHashJoinProbe::needsInput" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinProbe::needsInput" << std::endl; + } return !finished_ && input_ == nullptr; } + void CudfHashJoinProbe::addInput(RowVectorPtr input) { - std::cout << "Calling CudfHashJoinProbe::addInput" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinProbe::addInput" << std::endl; + } input_ = std::move(input); } RowVectorPtr CudfHashJoinProbe::getOutput() { - std::cout << "Calling CudfHashJoinProbe::getOutput" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinProbe::getOutput" << std::endl; + } NVTX3_FUNC_RANGE(); if (!input_) { return nullptr; @@ -203,31 +261,37 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { } // TODO convert input to cudf table auto tbl = to_cudf_table(input_); - std::cout << "Probe table number of columns: " << tbl->num_columns() - << std::endl; - std::cout << "Probe table number of rows: " << tbl->num_rows() << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Probe table number of columns: " << tbl->num_columns() + << std::endl; + std::cout << "Probe table number of rows: " << tbl->num_rows() << std::endl; + } auto probeType = joinNode_->sources()[0]->outputType(); auto buildType = joinNode_->sources()[1]->outputType(); auto probeKeys = joinNode_->leftKeys(); auto buildKeys = joinNode_->rightKeys(); - for (int i = 0; i < probeType->names().size(); i++) { - std::cout << "Left column " << i << ": " << probeType->names()[i] - << std::endl; - } + if (cudfDebugEnabled()) { + for (int i = 0; i < probeType->names().size(); i++) { + std::cout << "Left column " << i << ": " << probeType->names()[i] + << std::endl; + } - for (int i = 0; i < buildType->names().size(); i++) { - std::cout << "Right column " << i << ": " << buildType->names()[i] - << std::endl; - } + for (int i = 0; i < buildType->names().size(); i++) { + std::cout << "Right column " << i << ": " << buildType->names()[i] + << std::endl; + } - for (int i = 0; i < probeKeys.size(); i++) { - std::cout << "Left key " << i << ": " << probeKeys[i]->name() << std::endl; - } + for (int i = 0; i < probeKeys.size(); i++) { + std::cout << "Left key " << i << ": " << probeKeys[i]->name() << " " + << probeKeys[i]->type()->kind() << std::endl; + } - for (int i = 0; i < buildKeys.size(); i++) { - std::cout << "Right key " << i << ": " << buildKeys[i]->name() << std::endl; + for (int i = 0; i < buildKeys.size(); i++) { + std::cout << "Right key " << i << ": " << buildKeys[i]->name() << " " + << buildKeys[i]->type()->kind() << std::endl; + } } auto probe_key_indices = std::vector(probeKeys.size()); @@ -239,9 +303,24 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { // TODO pass the input pool !!! // TODO: We should probably subset columns before calling to_cudf_table? // Maybe that isn't a problem if we fuse operators together. + auto& tb = hashObject_.value().first; + auto& hb = hashObject_.value().second; + VELOX_CHECK_NOT_NULL(tb); + VELOX_CHECK_NOT_NULL(hb); + if (cudfDebugEnabled()) { + if (tb != nullptr) + printf( + "tb is not nullptr %p hasValue(%d)\n", + tb.get(), + hashObject_.has_value()); + if (hb != nullptr) + printf( + "hb is not nullptr %p hasValue(%d)\n", + hb.get(), + hashObject_.has_value()); + } auto const [left_join_indices, right_join_indices] = - hashObject_.value().second->inner_join( - tbl->view().select(probe_key_indices)); + hb->inner_join(tbl->view().select(probe_key_indices)); auto left_indices_span = cudf::device_span{*left_join_indices}; auto right_indices_span = @@ -254,7 +333,9 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { auto right_column_output_indices = std::vector(); for (int i = 0; i < outputType->names().size(); i++) { auto const output_name = outputType->names()[i]; - std::cout << "Output column " << i << ": " << output_name << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Output column " << i << ": " << output_name << std::endl; + } auto channel = probeType->getChildIdxIfExists(output_name); if (channel.has_value()) { left_column_indices_to_gather.push_back( @@ -273,14 +354,16 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { "Join field {} not in probe or build input", outputType->children()[i]); } - for (int i = 0; i < left_column_indices_to_gather.size(); i++) { - std::cout << "Left index to gather " << i << ": " - << left_column_indices_to_gather[i] << std::endl; - } + if (cudfDebugEnabled()) { + for (int i = 0; i < left_column_indices_to_gather.size(); i++) { + std::cout << "Left index to gather " << i << ": " + << left_column_indices_to_gather[i] << std::endl; + } - for (int i = 0; i < right_column_indices_to_gather.size(); i++) { - std::cout << "Right index to gather " << i << ": " - << right_column_indices_to_gather[i] << std::endl; + for (int i = 0; i < right_column_indices_to_gather.size(); i++) { + std::cout << "Right index to gather " << i << ": " + << right_column_indices_to_gather[i] << std::endl; + } } auto left_input = tbl->view().select(left_column_indices_to_gather); @@ -293,10 +376,12 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { auto left_result = cudf::gather(left_input, left_indices_col, oob_policy); auto right_result = cudf::gather(right_input, right_indices_col, oob_policy); - std::cout << "Left result number of columns: " << left_result->num_columns() - << std::endl; - std::cout << "Right result number of columns: " << right_result->num_columns() - << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Left result number of columns: " << left_result->num_columns() + << std::endl; + std::cout << "Right result number of columns: " + << right_result->num_columns() << std::endl; + } auto left_cols = left_result->release(); auto right_cols = right_result->release(); @@ -318,25 +403,32 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { } input_.reset(); - finished_ = true; - // printResults(output, std::cout); + finished_ = noMoreInput_; + return output; } exec::BlockingReason CudfHashJoinProbe::isBlocked(ContinueFuture* future) { - std::cout << "Calling CudfHashJoinProbe::isBlocked" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinProbe::isBlocked" << std::endl; + } if (hashObject_.has_value()) { return exec::BlockingReason::kNotBlocked; } auto joinBridge = operatorCtx_->task()->getCustomJoinBridge( operatorCtx_->driverCtx()->splitGroupId, planNodeId()); - auto hashObject = std::dynamic_pointer_cast(joinBridge) - ->hashOrFuture(future); + auto cudf_joinBridge = + std::dynamic_pointer_cast(joinBridge); + VELOX_CHECK_NOT_NULL(cudf_joinBridge); + VELOX_CHECK_NOT_NULL(future); + auto hashObject = cudf_joinBridge->hashOrFuture(future); if (!hashObject.has_value()) { - std::cout << "CudfHashJoinProbe is blocked, waiting for join build" - << std::endl; + if (cudfDebugEnabled()) { + std::cout << "CudfHashJoinProbe is blocked, waiting for join build" + << std::endl; + } return exec::BlockingReason::kWaitForJoinBuild; } hashObject_ = std::move(hashObject); @@ -345,15 +437,26 @@ exec::BlockingReason CudfHashJoinProbe::isBlocked(ContinueFuture* future) { } bool CudfHashJoinProbe::isFinished() { - std::cout << "Calling CudfHashJoinProbe::isFinished" << std::endl; - return finished_ || (noMoreInput_ && input_ == nullptr); + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinProbe::isFinished" << std::endl; + } + auto const is_finished = finished_ || (noMoreInput_ && input_ == nullptr); + + // Release hashObject_ if finished + if (is_finished) { + hashObject_.reset(); + } + return is_finished; } std::unique_ptr CudfHashJoinBridgeTranslator::toOperator( exec::DriverCtx* ctx, int32_t id, const core::PlanNodePtr& node) { - std::cout << "Calling CudfHashJoinBridgeTranslator::toOperator" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBridgeTranslator::toOperator" + << std::endl; + } if (auto joinNode = std::dynamic_pointer_cast(node)) { return std::make_unique(id, ctx, joinNode); @@ -363,8 +466,10 @@ std::unique_ptr CudfHashJoinBridgeTranslator::toOperator( std::unique_ptr CudfHashJoinBridgeTranslator::toJoinBridge( const core::PlanNodePtr& node) { - std::cout << "Calling CudfHashJoinBridgeTranslator::toJoinBridge" - << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBridgeTranslator::toJoinBridge" + << std::endl; + } if (auto joinNode = std::dynamic_pointer_cast(node)) { auto joinBridge = std::make_unique(); @@ -375,8 +480,10 @@ std::unique_ptr CudfHashJoinBridgeTranslator::toJoinBridge( exec::OperatorSupplier CudfHashJoinBridgeTranslator::toOperatorSupplier( const core::PlanNodePtr& node) { - std::cout << "Calling CudfHashJoinBridgeTranslator::toOperatorSupplier" - << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling CudfHashJoinBridgeTranslator::toOperatorSupplier" + << std::endl; + } if (auto joinNode = std::dynamic_pointer_cast(node)) { return [joinNode](int32_t operatorId, exec::DriverCtx* ctx) { diff --git a/velox/experimental/cudf/exec/ToCudf.cpp b/velox/experimental/cudf/exec/ToCudf.cpp index 033115b08922..d4dcb900c1f1 100644 --- a/velox/experimental/cudf/exec/ToCudf.cpp +++ b/velox/experimental/cudf/exec/ToCudf.cpp @@ -22,24 +22,33 @@ #include "velox/exec/HashProbe.h" #include "velox/exec/Operator.h" #include "velox/experimental/cudf/exec/CudfHashJoin.h" +#include "velox/experimental/cudf/exec/Utilities.h" #include namespace facebook::velox::cudf_velox { +static bool _cudfIsRegistered = false; + bool CompileState::compile() { - std::cout << "Calling cudfDriverAdapter" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Calling cudfDriverAdapter" << std::endl; + } + auto operators = driver_.operators(); auto& nodes = planNodes_; - std::cout << "Number of operators: " << operators.size() << std::endl; - for (auto& op : operators) { - std::cout << " Operator: ID " << op->operatorId() << ": " << op->toString() - << std::endl; - } - std::cout << "Number of plan nodes: " << nodes.size() << std::endl; - for (auto& node : nodes) { - std::cout << " Plan node: ID " << node->id() << ": " << node->toString() - << std::endl; + + if (cudfDebugEnabled()) { + std::cout << "Number of operators: " << operators.size() << std::endl; + for (auto& op : operators) { + std::cout << " Operator: ID " << op->operatorId() << ": " + << op->toString() << std::endl; + } + std::cout << "Number of plan nodes: " << nodes.size() << std::endl; + for (auto& node : nodes) { + std::cout << " Plan node: ID " << node->id() << ": " << node->toString() + << std::endl; + } } // Make sure operator states are initialized. We will need to inspect some of @@ -96,22 +105,28 @@ bool CompileState::compile() { struct cudfDriverAdapter { std::shared_ptr>> planNodes; cudfDriverAdapter() { - std::cout << "cudfDriverAdapter constructor" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "cudfDriverAdapter constructor" << std::endl; + } planNodes = std::make_shared>>(); } ~cudfDriverAdapter() { - std::cout << "cudfDriverAdapter destructor" << std::endl; - printf( - "cached planNodes %p, %ld\n", planNodes.get(), planNodes.use_count()); + if (cudfDebugEnabled()) { + std::cout << "cudfDriverAdapter destructor" << std::endl; + printf( + "cached planNodes %p, %ld\n", planNodes.get(), planNodes.use_count()); + } } // driveradapter bool operator()(const exec::DriverFactory& factory, exec::Driver& driver) { auto state = CompileState(factory, driver, *planNodes); // Stored planNodes from inspect. - printf("driver.planNodes=%p\n", planNodes.get()); - for (auto planNode : *planNodes) { - std::cout << "PlanNode: " << (*planNode).toString() << std::endl; + if (cudfDebugEnabled()) { + printf("driver.planNodes=%p\n", planNodes.get()); + for (auto planNode : *planNodes) { + std::cout << "PlanNode: " << (*planNode).toString() << std::endl; + } } auto res = state.compile(); return res; @@ -130,30 +145,47 @@ struct cudfDriverAdapter { // signature: std::function inspect; // call: adapter.inspect(planFragment); planNodes->clear(); - std::cout << "Inspecting PlanFragment: " << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Inspecting PlanFragment" << std::endl; + } if (planNodes) { - printf("inspect.planNodes=%p\n", planNodes.get()); storePlanNodes(planFragment.planNode); - } else { - std::cout << "planNodes_ptr is nullptr" << std::endl; } } }; void registerCudf() { + const char* env_cudf_disabled = std::getenv("VELOX_CUDF_DISABLED"); + if (env_cudf_disabled != nullptr && std::stoi(env_cudf_disabled)) { + return; + } + CUDF_FUNC_RANGE(); cudaFree(0); // to init context. - std::cout << "Registering CudfHashJoinBridgeTranslator" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Registering CudfHashJoinBridgeTranslator" << std::endl; + } exec::Operator::registerOperator( std::make_unique()); - std::cout << "Registering cudfDriverAdapter" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Registering cudfDriverAdapter" << std::endl; + } cudfDriverAdapter cda{}; exec::DriverAdapter cudfAdapter{"cuDF", cda, cda}; exec::DriverFactory::registerAdapter(cudfAdapter); + _cudfIsRegistered = true; } void unregisterCudf() { - std::cout << "unRegistering cudfDriverAdapter" << std::endl; + if (cudfDebugEnabled()) { + std::cout << "Unregistering cudfDriverAdapter" << std::endl; + } exec::DriverFactory::adapters.clear(); + _cudfIsRegistered = false; +} + +bool cudfIsRegistered() { + return _cudfIsRegistered; } + } // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/ToCudf.h b/velox/experimental/cudf/exec/ToCudf.h index 49d6aa50a7d0..8da0eba26ae2 100644 --- a/velox/experimental/cudf/exec/ToCudf.h +++ b/velox/experimental/cudf/exec/ToCudf.h @@ -46,4 +46,7 @@ class CompileState { void registerCudf(); void unregisterCudf(); +/// Returns true if cuDF is registered. +bool cudfIsRegistered(); + } // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/Utilities.cpp b/velox/experimental/cudf/exec/Utilities.cpp new file mode 100644 index 000000000000..62d10fc3c651 --- /dev/null +++ b/velox/experimental/cudf/exec/Utilities.cpp @@ -0,0 +1,27 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace facebook::velox::cudf_velox { + +bool cudfDebugEnabled() { + const char* env_cudf_debug = std::getenv("VELOX_CUDF_DEBUG"); + return env_cudf_debug != nullptr && std::stoi(env_cudf_debug); +} + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/Utilities.h b/velox/experimental/cudf/exec/Utilities.h new file mode 100644 index 000000000000..98394b877891 --- /dev/null +++ b/velox/experimental/cudf/exec/Utilities.h @@ -0,0 +1,23 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::cudf_velox { + +bool cudfDebugEnabled(); + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/VeloxCudfInterop.cpp b/velox/experimental/cudf/exec/VeloxCudfInterop.cpp index 680b6886d173..5e21bc3ec4b0 100644 --- a/velox/experimental/cudf/exec/VeloxCudfInterop.cpp +++ b/velox/experimental/cudf/exec/VeloxCudfInterop.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -37,49 +38,50 @@ #include #include -#include "VeloxCudfInterop.h" +#include "velox/experimental/cudf/exec/Utilities.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" + +#include +#include namespace facebook::velox::cudf_velox { -// Velox type to CUDF type -/* -template -struct VeloxToCudfType { - using type = typename TypeTraits::NativeType; - static constexpr cudf::type_id id = cudf::type_id::EMPTY; - //cudf::type_to_id(); -}; +namespace { -#define VELOX_TO_CUDF_TYPE(CUDF_KIND, VELOX_KIND) \ -template <> \ -struct TypeTraits { \ -using type = typename TypeTraits::NativeType; \ -static constexpr cudf::type_id id = CUDF_KIND; \ -}; +template +constexpr decltype(auto) +vector_encoding_dispatcher(VectorPtr vec, Functor f, Ts&&... args) { + using facebook::velox::VectorEncoding::Simple; + switch (vec->encoding()) { + case Simple::FLAT: + return f(vec->as>(), std::forward(args)...); + case Simple::DICTIONARY: + return f(vec->as>(), std::forward(args)...); + default: { + if (cudfDebugEnabled()) { + std::cout << "Unsupported Velox encoding: " << vec->encoding() + << std::endl; + } + CUDF_FAIL("Unsupported Velox encoding"); + } + } +} + +// TODO: dispatch other duration/timestamp types! +template +using cudf_storage_type_t = std::conditional_t< + std::is_same_v, + cudf::timestamp_D::rep, + cudf::device_storage_type_t>; + +} // namespace -VELOX_TO_CUDF_TYPE(cudf::type_id::BOOL8, BOOLEAN) -VELOX_TO_CUDF_TYPE(cudf::type_id::INT8, TINYINT) -VELOX_TO_CUDF_TYPE(cudf::type_id::INT16, SMALLINT) -VELOX_TO_CUDF_TYPE(cudf::type_id::INT32, INTEGER) -VELOX_TO_CUDF_TYPE(cudf::type_id::INT64, BIGINT) -VELOX_TO_CUDF_TYPE(cudf::type_id::FLOAT32, REAL) -VELOX_TO_CUDF_TYPE(cudf::type_id::FLOAT64, DOUBLE) -VELOX_TO_CUDF_TYPE(cudf::type_id::STRING, VARCHAR) -VELOX_TO_CUDF_TYPE(cudf::type_id::STRING, VARBINARY) -VELOX_TO_CUDF_TYPE(cudf::type_id::TIMESTAMP_NANOSECONDS, TIMESTAMP) -VELOX_TO_CUDF_TYPE(cudf::type_id::DURATION_DAYS, DATE) -// VELOX_TO_CUDF_TYPE(IntervalDayTime, INTERVAL_DAY_TIME) -VELOX_TO_CUDF_TYPE(cudf::type_id::DECIMAL64, SHORT_DECIMAL) -VELOX_TO_CUDF_TYPE(cudf::type_id::DECIMAL128, LONG_DECIMAL) -// VELOX_TO_CUDF_TYPE(Array, ARRAY) -// VELOX_TO_CUDF_TYPE(Map, MAP) -// VELOX_TO_CUDF_TYPE(Row, ROW) -// VELOX_TO_CUDF_TYPE(Opaque, OPAQUE) -// VELOX_TO_CUDF_TYPE(UnKnown, UNKNOWN) -*/ - -cudf::type_id velox_to_cudf_type_id(TypeKind kind) { - switch (kind) { +cudf::type_id velox_to_cudf_type_id(const TypePtr& type) { + if (cudfDebugEnabled()) { + std::cout << "Converting Velox type " << type->toString() << " to cudf" + << std::endl; + } + switch (type->kind()) { case TypeKind::BOOLEAN: return cudf::type_id::BOOL8; case TypeKind::TINYINT: @@ -87,6 +89,13 @@ cudf::type_id velox_to_cudf_type_id(TypeKind kind) { case TypeKind::SMALLINT: return cudf::type_id::INT16; case TypeKind::INTEGER: + // TODO: handle interval types (durations?) + // if (type->isIntervalYearMonth()) { + // return cudf::type_id::...; + // } + if (type->isDate()) { + return cudf::type_id::TIMESTAMP_DAYS; + } return cudf::type_id::INT32; case TypeKind::BIGINT: return cudf::type_id::INT64; @@ -118,60 +127,72 @@ cudf::type_id velox_to_cudf_type_id(TypeKind kind) { // case TypeKind::OPAQUE: return cudf::type_id::EMPTY; // case TypeKind::INVALID: return cudf::type_id::EMPTY; default: + CUDF_FAIL("Unsupported Velox type"); return cudf::type_id::EMPTY; } } -TypeKind cudf_to_velox_type_id(cudf::type_id kind) { - switch (kind) { +TypePtr cudf_type_id_to_velox_type(cudf::type_id type_id) { + switch (type_id) { case cudf::type_id::BOOL8: - return TypeKind::BOOLEAN; + return BOOLEAN(); case cudf::type_id::INT8: - return TypeKind::TINYINT; + return TINYINT(); case cudf::type_id::INT16: - return TypeKind::SMALLINT; + return SMALLINT(); case cudf::type_id::INT32: - return TypeKind::INTEGER; + return INTEGER(); case cudf::type_id::INT64: - return TypeKind::BIGINT; + return BIGINT(); case cudf::type_id::FLOAT32: - return TypeKind::REAL; + return REAL(); case cudf::type_id::FLOAT64: - return TypeKind::DOUBLE; + return DOUBLE(); case cudf::type_id::STRING: - return TypeKind::VARCHAR; + return VARCHAR(); + case cudf::type_id::TIMESTAMP_DAYS: + return DATE(); case cudf::type_id::TIMESTAMP_NANOSECONDS: - return TypeKind::TIMESTAMP; + return TIMESTAMP(); // TODO: DATE is now a logical type - // case cudf::type_id::DURATION_DAYS: return TypeKind::DATE; + // case cudf::type_id::DURATION_DAYS: return ???; // case cudf::type_id::EMPTY: return TypeKind::INTERVAL_DAY_TIME; // TODO: DECIMAL is now a logical type // case cudf::type_id::DECIMAL64: return TypeKind::SHORT_DECIMAL; // case cudf::type_id::DECIMAL128: return TypeKind::LONG_DECIMAL; // case cudf::type_id::EMPTY: return TypeKind::ARRAY; // case cudf::type_id::EMPTY: return TypeKind::MAP; - case cudf::type_id::STRUCT: - return TypeKind::ROW; + // case cudf::type_id::STRUCT: + // // TODO: Need parametric type support? + // return ROW(); // case cudf::type_id::EMPTY: return TypeKind::OPAQUE; // case cudf::type_id::EMPTY: return TypeKind::UNKNOWN; default: - return TypeKind::UNKNOWN; + return UNKNOWN(); } } // Convert a Velox vector to a CUDF column struct copy_to_device { rmm::cuda_stream_view stream; - template - static constexpr bool is_supported() { - return cudf::is_rep_layout_compatible(); - } + // Fixed width types - template ()>* = nullptr> - std::unique_ptr operator()(VectorPtr& h_vec) const { - auto velox_data = h_vec->as>(); + template < + typename T, + std::enable_if_t()>* = nullptr> + std::unique_ptr operator()(VectorPtr const& h_vec) const { + VELOX_CHECK_NOT_NULL(h_vec); + using velox_T = cudf_storage_type_t; + if (cudfDebugEnabled()) { + std::cout << "Converting fixed width column" << std::endl; + std::cout << "Encoding: " << h_vec->encoding() << std::endl; + std::cout << "Type: " << h_vec->type()->toString() << std::endl; + std::cout << "velox_T: " << typeid(velox_T{}).name() << std::endl; + } + auto velox_data = h_vec->as>(); + VELOX_CHECK_NOT_NULL(velox_data); auto velox_data_ptr = velox_data->rawValues(); - cudf::host_span velox_host_span( + cudf::host_span velox_host_span( velox_data_ptr, int{h_vec->size()}); auto d_v = cudf::detail::make_device_uvector_sync( velox_host_span, stream, rmm::mr::get_current_device_resource()); @@ -179,11 +200,73 @@ struct copy_to_device { std::move(d_v), rmm::device_buffer{}, 0); } + // Strings + template < + typename T, + std::enable_if_t>* = nullptr> + std::unique_ptr operator()(VectorPtr const& h_vec) const { + if (cudfDebugEnabled()) { + std::cout << "Converting string column" << std::endl; + } + + auto const num_rows = h_vec->size(); + auto h_offsets = std::vector(num_rows + 1); + h_offsets[0] = 0; + auto make_offsets = [&](auto const& vec) { + VELOX_CHECK_NOT_NULL(vec); + if (cudfDebugEnabled()) { + std::cout << "Starting offset calculation" << std::endl; + } + for (auto i = 0; i < num_rows; i++) { + h_offsets[i + 1] = h_offsets[i] + vec->valueAt(i).size(); + } + }; + vector_encoding_dispatcher(h_vec, make_offsets); + + auto d_offsets = cudf::detail::make_device_uvector_sync( + h_offsets, stream, rmm::mr::get_current_device_resource()); + + auto chars_size = h_offsets[num_rows]; + auto h_chars = std::vector(chars_size); + + auto make_chars = [&](auto vec) { + VELOX_CHECK_NOT_NULL(vec); + for (auto i = 0; i < num_rows; i++) { + auto const string_view = vec->valueAt(i); + auto const size = string_view.size(); + auto const offset = h_offsets[i]; + std::copy( + string_view.data(), + string_view.data() + size, + h_chars.begin() + offset); + } + }; + vector_encoding_dispatcher(h_vec, make_chars); + + auto d_chars = cudf::detail::make_device_uvector_sync( + h_chars, stream, rmm::mr::get_current_device_resource()); + + return cudf::make_strings_column( + num_rows, + std::make_unique( + std::move(d_offsets), rmm::device_buffer{}, 0), + d_chars.release(), + 0, + rmm::device_buffer{}); + } + template < typename T, typename... Args, - std::enable_if_t()>* = nullptr> - std::unique_ptr operator()(Args... args) const { + std::enable_if_t< + not(cudf::is_rep_layout_compatible() or + std::is_same_v)>* = nullptr> + std::unique_ptr operator()(VectorPtr const& h_vec) const { + if (cudfDebugEnabled()) { + std::string error_message = "Unsupported type for to_cudf conversion: "; + error_message += h_vec->type()->toString(); + std::cout << error_message << std::endl; + } CUDF_FAIL("Unsupported type for to_cudf conversion"); } }; @@ -197,9 +280,8 @@ std::unique_ptr to_cudf_table(const RowVectorPtr& leftBatch) { using cudf_col_ptr = std::unique_ptr; std::vector cudf_columns; auto copier = copy_to_device{cudf::get_default_stream()}; - for (auto& h_vec : leftBatch->children()) { - auto cudf_kind = - cudf::data_type{velox_to_cudf_type_id(h_vec->type()->kind())}; + for (auto const& h_vec : leftBatch->children()) { + auto cudf_kind = cudf::data_type{velox_to_cudf_type_id(h_vec->type())}; auto cudf_column = cudf::type_dispatcher(cudf_kind, copier, h_vec); cudf_columns.push_back(std::move(cudf_column)); } @@ -216,14 +298,18 @@ struct copy_to_host { // return cudf::is_rep_layout_compatible(); return cudf::is_numeric() and not std::is_same::value; } + // Fixed width types template ()>* = nullptr> VectorPtr operator()(TypePtr velox_type, cudf::column_view const& col) const { - // auto velox_col = BaseVector::create(velox_type, col.size(), pool_); - // auto velox_col = BaseVector::create >(velox_type, - // col.size(), pool_); - auto velox_col = test::VectorMaker{pool_}.flatVector(col.size()); - // auto velox_data = velox_col->as>(); + auto velox_buffer = AlignedBuffer::allocate(col.size(), pool_); + auto velox_col = std::make_shared>( + pool_, + velox_type, + nullptr, + col.size(), + velox_buffer, + std::vector{}); auto velox_data_ptr = velox_col->mutableRawValues(); CUDF_CUDA_TRY(cudaMemcpyAsync( velox_data_ptr, @@ -248,8 +334,11 @@ VectorPtr to_velox_column( const cudf::column_view& col, memory::MemoryPool* pool) { NVTX3_FUNC_RANGE(); - auto velox_kind = cudf_to_velox_type_id(col.type().id()); - auto velox_type = createScalarType(velox_kind); + auto velox_type = cudf_type_id_to_velox_type(col.type().id()); + if (cudfDebugEnabled()) { + std::cout << "Converting to_velox_column: " << velox_type->toString() + << std::endl; + } // cudf type dispatcher to copy data from cudf column to velox vector auto copier = copy_to_host{cudf::get_default_stream(), pool}; return cudf::type_dispatcher(col.type(), copier, velox_type, col); diff --git a/velox/experimental/cudf/tests/HashJoinTest.cpp b/velox/experimental/cudf/tests/HashJoinTest.cpp index 4e449c2748b1..6ed8e9cd5566 100644 --- a/velox/experimental/cudf/tests/HashJoinTest.cpp +++ b/velox/experimental/cudf/tests/HashJoinTest.cpp @@ -32,6 +32,7 @@ #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/exec/tests/utils/VectorTestUtil.h" #include "velox/experimental/cudf/exec/ToCudf.h" +#include "velox/experimental/cudf/exec/Utilities.h" #include "velox/vector/fuzzer/VectorFuzzer.h" using namespace facebook::velox; @@ -245,7 +246,7 @@ class HashJoinBuilder { planNode.get(), [](const core::PlanNode* node) { return dynamic_cast(node) != nullptr; }); - if (hash_node_ptr != nullptr) { + if (cudf_velox::cudfDebugEnabled() && hash_node_ptr != nullptr) { std::cout << "Found a HashJoinNode" << std::endl; } return *this; @@ -1043,9 +1044,13 @@ TEST_F(HashJoinTest, multipleProbeColumns) { TEST_F(HashJoinTest, multipleBuildColumns) { // Test hash join with multiple probe columns. auto probeVectors = std::vector{makeRowVector( - {"t_k1", "t_k2"}, + {"t_k1", "t_k2", "t_k3"}, {makeFlatVector(20, [](auto row) { return 1 + row % 2; }), - makeFlatVector(20, [](auto row) { return row; })})}; + makeFlatVector(20, [](auto row) { return row; }), + makeFlatVector(20, [&](auto row) { + auto temp = std::to_string(row % 3 + 1); + return StringView(temp); + })})}; auto buildVectors = std::vector{makeRowVector( {"u_k1", "u_k2"}, {makeFlatVector({1, 2}), makeFlatVector({3, 4})})};