From 154fa5948f8db2641d73b0b8bc4705387bb6808b Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Wed, 25 Jan 2023 15:44:11 -0800 Subject: [PATCH 1/4] Use in-memory communicator to test quantile --- tests/cpp/common/test_quantile.cc | 258 +++++++-------- tests/cpp/common/test_quantile.cu | 302 +++++++++--------- tests/cpp/common/test_quantile.h | 25 -- tests/cpp/helpers.cc | 11 +- tests/cpp/helpers.h | 6 +- tests/cpp/predictor/test_cpu_predictor.cc | 6 +- .../test_gpu_with_dask/test_gpu_with_dask.py | 43 --- .../test_with_dask/test_with_dask.py | 56 ---- 8 files changed, 299 insertions(+), 408 deletions(-) diff --git a/tests/cpp/common/test_quantile.cc b/tests/cpp/common/test_quantile.cc index 73fa4d5e74a1..52fb0d9689ea 100644 --- a/tests/cpp/common/test_quantile.cc +++ b/tests/cpp/common/test_quantile.cc @@ -5,8 +5,9 @@ #include +#include + #include "../../../src/common/hist_util.h" -#include "../../../src/common/quantile.h" #include "../../../src/data/adapter.h" namespace xgboost { @@ -44,158 +45,157 @@ void PushPage(HostSketchContainer* container, SparsePage const& page, MetaInfo c template void TestDistributedQuantile(size_t rows, size_t cols) { - std::string msg {"Skipping AllReduce test"}; - int32_t constexpr kWorkers = 4; - InitCommunicatorContext(msg, kWorkers); - auto world = collective::GetWorldSize(); - if (world != 1) { - ASSERT_EQ(world, kWorkers); - } else { - return; - } + auto constexpr kWorkers = 4; + std::vector threads; + for (auto rank = 0; rank < kWorkers; rank++) { + threads.emplace_back([=]() { + InitInMemoryCommunicator(kWorkers, rank); + + auto world = collective::GetWorldSize(); + if (world != 1) { + ASSERT_EQ(world, kWorkers); + } else { + return; + } - std::vector infos(2); - auto& h_weights = infos.front().weights_.HostVector(); - h_weights.resize(rows); - SimpleLCG lcg; - SimpleRealUniformDistribution dist(3, 1000); - std::generate(h_weights.begin(), h_weights.end(), [&]() { return dist(&lcg); }); - std::vector column_size(cols, rows); - size_t n_bins = 64; - - // Generate cuts for distributed environment. - auto sparsity = 0.5f; - auto rank = collective::GetRank(); - std::vector ft(cols); - for (size_t i = 0; i < ft.size(); ++i) { - ft[i] = (i % 2 == 0) ? FeatureType::kNumerical : FeatureType::kCategorical; - } + std::vector infos(2); + auto& h_weights = infos.front().weights_.HostVector(); + h_weights.resize(rows); + SimpleLCG lcg; + SimpleRealUniformDistribution dist(3, 1000); + std::generate(h_weights.begin(), h_weights.end(), [&]() { return dist(&lcg); }); + std::vector column_size(cols, rows); + size_t n_bins = 64; + + // Generate cuts for distributed environment. + auto sparsity = 0.5f; + std::vector ft(cols); + for (size_t i = 0; i < ft.size(); ++i) { + ft[i] = (i % 2 == 0) ? FeatureType::kNumerical : FeatureType::kCategorical; + } - auto m = RandomDataGenerator{rows, cols, sparsity} - .Seed(rank) - .Lower(.0f) - .Upper(1.0f) - .Type(ft) - .MaxCategory(13) - .GenerateDMatrix(); - - std::vector hessian(rows, 1.0); - auto hess = Span{hessian}; - - ContainerType sketch_distributed(n_bins, m->Info().feature_types.ConstHostSpan(), - column_size, false, OmpGetNumThreads(0)); - - if (use_column) { - for (auto const& page : m->GetBatches()) { - PushPage(&sketch_distributed, page, m->Info(), hess); - } - } else { - for (auto const& page : m->GetBatches()) { - PushPage(&sketch_distributed, page, m->Info(), hess); - } - } + auto m = RandomDataGenerator{rows, cols, sparsity} + .Seed(rank) + .Lower(.0f) + .Upper(1.0f) + .Type(ft) + .MaxCategory(13) + .GenerateDMatrix(); + + std::vector hessian(rows, 1.0); + auto hess = Span{hessian}; - HistogramCuts distributed_cuts; - sketch_distributed.MakeCuts(&distributed_cuts); - - // Generate cuts for single node environment - collective::Finalize(); - CHECK_EQ(collective::GetWorldSize(), 1); - std::for_each(column_size.begin(), column_size.end(), [=](auto& size) { size *= world; }); - m->Info().num_row_ = world * rows; - ContainerType sketch_on_single_node(n_bins, m->Info().feature_types.ConstHostSpan(), - column_size, false, OmpGetNumThreads(0)); - m->Info().num_row_ = rows; - - for (auto rank = 0; rank < world; ++rank) { - auto m = RandomDataGenerator{rows, cols, sparsity} - .Seed(rank) - .Type(ft) - .MaxCategory(13) - .Lower(.0f) - .Upper(1.0f) - .GenerateDMatrix(); - if (use_column) { - for (auto const& page : m->GetBatches()) { - PushPage(&sketch_on_single_node, page, m->Info(), hess); + ContainerType sketch_distributed(n_bins, m->Info().feature_types.ConstHostSpan(), + column_size, false, OmpGetNumThreads(0)); + + if (use_column) { + for (auto const& page : m->GetBatches()) { + PushPage(&sketch_distributed, page, m->Info(), hess); + } + } else { + for (auto const& page : m->GetBatches()) { + PushPage(&sketch_distributed, page, m->Info(), hess); + } } - } else { - for (auto const& page : m->GetBatches()) { - PushPage(&sketch_on_single_node, page, m->Info(), hess); + + HistogramCuts distributed_cuts; + sketch_distributed.MakeCuts(&distributed_cuts); + + // Generate cuts for single node environment + collective::Finalize(); + CHECK_EQ(collective::GetWorldSize(), 1); + std::for_each(column_size.begin(), column_size.end(), [=](auto& size) { size *= world; }); + m->Info().num_row_ = world * rows; + ContainerType sketch_on_single_node( + n_bins, m->Info().feature_types.ConstHostSpan(), column_size, false, OmpGetNumThreads(0)); + m->Info().num_row_ = rows; + + for (auto rank = 0; rank < world; ++rank) { + auto m = RandomDataGenerator{rows, cols, sparsity} + .Seed(rank) + .Type(ft) + .MaxCategory(13) + .Lower(.0f) + .Upper(1.0f) + .GenerateDMatrix(); + if (use_column) { + for (auto const& page : m->GetBatches()) { + PushPage(&sketch_on_single_node, page, m->Info(), hess); + } + } else { + for (auto const& page : m->GetBatches()) { + PushPage(&sketch_on_single_node, page, m->Info(), hess); + } + } } - } - } - HistogramCuts single_node_cuts; - sketch_on_single_node.MakeCuts(&single_node_cuts); + HistogramCuts single_node_cuts; + sketch_on_single_node.MakeCuts(&single_node_cuts); - auto const& sptrs = single_node_cuts.Ptrs(); - auto const& dptrs = distributed_cuts.Ptrs(); - auto const& svals = single_node_cuts.Values(); - auto const& dvals = distributed_cuts.Values(); - auto const& smins = single_node_cuts.MinValues(); - auto const& dmins = distributed_cuts.MinValues(); + auto const& sptrs = single_node_cuts.Ptrs(); + auto const& dptrs = distributed_cuts.Ptrs(); + auto const& svals = single_node_cuts.Values(); + auto const& dvals = distributed_cuts.Values(); + auto const& smins = single_node_cuts.MinValues(); + auto const& dmins = distributed_cuts.MinValues(); - ASSERT_EQ(sptrs.size(), dptrs.size()); - for (size_t i = 0; i < sptrs.size(); ++i) { - ASSERT_EQ(sptrs[i], dptrs[i]) << i; - } + ASSERT_EQ(sptrs.size(), dptrs.size()); + for (size_t i = 0; i < sptrs.size(); ++i) { + ASSERT_EQ(sptrs[i], dptrs[i]) << i; + } - ASSERT_EQ(svals.size(), dvals.size()); - for (size_t i = 0; i < svals.size(); ++i) { - ASSERT_NEAR(svals[i], dvals[i], 2e-2f); - } + ASSERT_EQ(svals.size(), dvals.size()); + for (size_t i = 0; i < svals.size(); ++i) { + ASSERT_NEAR(svals[i], dvals[i], 2e-2f); + } - ASSERT_EQ(smins.size(), dmins.size()); - for (size_t i = 0; i < smins.size(); ++i) { - ASSERT_FLOAT_EQ(smins[i], dmins[i]); + ASSERT_EQ(smins.size(), dmins.size()); + for (size_t i = 0; i < smins.size(); ++i) { + ASSERT_FLOAT_EQ(smins[i], dmins[i]); + } + }); + } + for (auto& thread : threads) { + thread.join(); } } TEST(Quantile, DistributedBasic) { -#if defined(__unix__) constexpr size_t kRows = 10, kCols = 10; TestDistributedQuantile(kRows, kCols); -#endif } TEST(Quantile, Distributed) { -#if defined(__unix__) constexpr size_t kRows = 4000, kCols = 200; TestDistributedQuantile(kRows, kCols); -#endif } TEST(Quantile, SortedDistributedBasic) { -#if defined(__unix__) constexpr size_t kRows = 10, kCols = 10; TestDistributedQuantile(kRows, kCols); -#endif } TEST(Quantile, SortedDistributed) { -#if defined(__unix__) constexpr size_t kRows = 4000, kCols = 200; TestDistributedQuantile(kRows, kCols); -#endif } TEST(Quantile, SameOnAllWorkers) { -#if defined(__unix__) - std::string msg{"Skipping Quantile AllreduceBasic test"}; - int32_t constexpr kWorkers = 4; - InitCommunicatorContext(msg, kWorkers); - auto world = collective::GetWorldSize(); - if (world != 1) { - CHECK_EQ(world, kWorkers); - } else { - LOG(WARNING) << msg; - return; - } + auto constexpr kWorkers = 4; + std::vector threads; + for (auto rank = 0; rank < kWorkers; rank++) { + threads.emplace_back([=]() { + InitInMemoryCommunicator(kWorkers, rank); + + auto world = collective::GetWorldSize(); + if (world != 1) { + ASSERT_EQ(world, kWorkers); + } else { + return; + } - constexpr size_t kRows = 1000, kCols = 100; - RunWithSeedsAndBins( - kRows, [=](int32_t seed, size_t n_bins, MetaInfo const&) { + constexpr size_t kRows = 1000, kCols = 100; + RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const&) { auto rank = collective::GetRank(); HostDeviceVector storage; std::vector ft(kCols); @@ -211,9 +211,8 @@ TEST(Quantile, SameOnAllWorkers) { .GenerateDMatrix(); auto cuts = SketchOnDMatrix(m.get(), n_bins, common::OmpGetNumThreads(0)); std::vector cut_values(cuts.Values().size() * world, 0); - std::vector< - typename std::remove_reference_t::value_type> - cut_ptrs(cuts.Ptrs().size() * world, 0); + std::vector::value_type> cut_ptrs( + cuts.Ptrs().size() * world, 0); std::vector cut_min_values(cuts.MinValues().size() * world, 0); size_t value_size = cuts.Values().size(); @@ -226,18 +225,17 @@ TEST(Quantile, SameOnAllWorkers) { CHECK_EQ(min_value_size, kCols); size_t value_offset = value_size * rank; - std::copy(cuts.Values().begin(), cuts.Values().end(), - cut_values.begin() + value_offset); + std::copy(cuts.Values().begin(), cuts.Values().end(), cut_values.begin() + value_offset); size_t ptr_offset = ptr_size * rank; - std::copy(cuts.Ptrs().cbegin(), cuts.Ptrs().cend(), - cut_ptrs.begin() + ptr_offset); + std::copy(cuts.Ptrs().cbegin(), cuts.Ptrs().cend(), cut_ptrs.begin() + ptr_offset); size_t min_values_offset = min_value_size * rank; std::copy(cuts.MinValues().cbegin(), cuts.MinValues().cend(), cut_min_values.begin() + min_values_offset); collective::Allreduce(cut_values.data(), cut_values.size()); collective::Allreduce(cut_ptrs.data(), cut_ptrs.size()); - collective::Allreduce(cut_min_values.data(), cut_min_values.size()); + collective::Allreduce(cut_min_values.data(), + cut_min_values.size()); for (int32_t i = 0; i < world; i++) { for (size_t j = 0; j < value_size; ++j) { @@ -256,8 +254,12 @@ TEST(Quantile, SameOnAllWorkers) { } } }); - collective::Finalize(); -#endif // defined(__unix__) + collective::Finalize(); + }); + } + for (auto& thread : threads) { + thread.join(); + } } } // namespace common } // namespace xgboost diff --git a/tests/cpp/common/test_quantile.cu b/tests/cpp/common/test_quantile.cu index 4d6fecd8849f..47610a07ef53 100644 --- a/tests/cpp/common/test_quantile.cu +++ b/tests/cpp/common/test_quantile.cu @@ -1,9 +1,12 @@ #include -#include "test_quantile.h" -#include "../helpers.h" + +#include + #include "../../../src/collective/device_communicator.cuh" #include "../../../src/common/hist_util.cuh" #include "../../../src/common/quantile.cuh" +#include "../helpers.h" +#include "test_quantile.h" namespace xgboost { namespace { @@ -339,156 +342,161 @@ TEST(GPUQuantile, MultiMerge) { } TEST(GPUQuantile, AllReduceBasic) { - // This test is supposed to run by a python test that setups the environment. - std::string msg {"Skipping AllReduce test"}; - auto n_gpus = AllVisibleGPUs(); - InitCommunicatorContext(msg, n_gpus); - auto world = collective::GetWorldSize(); - if (world != 1) { - ASSERT_EQ(world, n_gpus); - } else { - return; - } - - constexpr size_t kRows = 1000, kCols = 100; - RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const& info) { - // Set up single node version; - HostDeviceVector ft; - SketchContainer sketch_on_single_node(ft, n_bins, kCols, kRows, 0); - - size_t intermediate_num_cuts = std::min( - kRows * world, static_cast(n_bins * WQSketch::kFactor)); - std::vector containers; - for (auto rank = 0; rank < world; ++rank) { - HostDeviceVector storage; - std::string interface_str = RandomDataGenerator{kRows, kCols, 0} - .Device(0) - .Seed(rank + seed) - .GenerateArrayInterface(&storage); - data::CupyAdapter adapter(interface_str); - HostDeviceVector ft; - containers.emplace_back(ft, n_bins, kCols, kRows, 0); - AdapterDeviceSketch(adapter.Value(), n_bins, info, - std::numeric_limits::quiet_NaN(), - &containers.back()); - } - for (auto &sketch : containers) { - sketch.Prune(intermediate_num_cuts); - sketch_on_single_node.Merge(sketch.ColumnsPtr(), sketch.Data()); - sketch_on_single_node.FixError(); - } - sketch_on_single_node.Unique(); - TestQuantileElemRank(0, sketch_on_single_node.Data(), - sketch_on_single_node.ColumnsPtr(), true); + auto const n_gpus = AllVisibleGPUs(); + std::vector threads; + for (auto rank = 0; rank < n_gpus; rank++) { + threads.emplace_back([=]() { + InitInMemoryCommunicator(n_gpus, rank); + + auto const world = collective::GetWorldSize(); + if (world != 1) { + ASSERT_EQ(world, n_gpus); + } else { + return; + } - // Set up distributed version. We rely on using rank as seed to generate - // the exact same copy of data. - auto rank = collective::GetRank(); - SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); - HostDeviceVector storage; - std::string interface_str = RandomDataGenerator{kRows, kCols, 0} - .Device(0) - .Seed(rank + seed) - .GenerateArrayInterface(&storage); - data::CupyAdapter adapter(interface_str); - AdapterDeviceSketch(adapter.Value(), n_bins, info, - std::numeric_limits::quiet_NaN(), - &sketch_distributed); - sketch_distributed.AllReduce(); - sketch_distributed.Unique(); - - ASSERT_EQ(sketch_distributed.ColumnsPtr().size(), - sketch_on_single_node.ColumnsPtr().size()); - ASSERT_EQ(sketch_distributed.Data().size(), - sketch_on_single_node.Data().size()); - - TestQuantileElemRank(0, sketch_distributed.Data(), - sketch_distributed.ColumnsPtr(), true); - - std::vector single_node_data( - sketch_on_single_node.Data().size()); - dh::CopyDeviceSpanToVector(&single_node_data, sketch_on_single_node.Data()); - - std::vector distributed_data(sketch_distributed.Data().size()); - dh::CopyDeviceSpanToVector(&distributed_data, sketch_distributed.Data()); - float Eps = 2e-4 * world; - - for (size_t i = 0; i < single_node_data.size(); ++i) { - ASSERT_NEAR(single_node_data[i].value, distributed_data[i].value, Eps); - ASSERT_NEAR(single_node_data[i].rmax, distributed_data[i].rmax, Eps); - ASSERT_NEAR(single_node_data[i].rmin, distributed_data[i].rmin, Eps); - ASSERT_NEAR(single_node_data[i].wmin, distributed_data[i].wmin, Eps); - } - }); - collective::Finalize(); + constexpr size_t kRows = 1000, kCols = 100; + RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const& info) { + // Set up single node version; + HostDeviceVector ft; + SketchContainer sketch_on_single_node(ft, n_bins, kCols, kRows, 0); + + size_t intermediate_num_cuts = + std::min(kRows * world, static_cast(n_bins * WQSketch::kFactor)); + std::vector containers; + for (auto rank = 0; rank < world; ++rank) { + HostDeviceVector storage; + std::string interface_str = RandomDataGenerator{kRows, kCols, 0} + .Device(0) + .Seed(rank + seed) + .GenerateArrayInterface(&storage); + data::CupyAdapter adapter(interface_str); + HostDeviceVector ft; + containers.emplace_back(ft, n_bins, kCols, kRows, 0); + AdapterDeviceSketch(adapter.Value(), n_bins, info, + std::numeric_limits::quiet_NaN(), &containers.back()); + } + for (auto& sketch : containers) { + sketch.Prune(intermediate_num_cuts); + sketch_on_single_node.Merge(sketch.ColumnsPtr(), sketch.Data()); + sketch_on_single_node.FixError(); + } + sketch_on_single_node.Unique(); + TestQuantileElemRank(0, sketch_on_single_node.Data(), sketch_on_single_node.ColumnsPtr(), + true); + + // Set up distributed version. We rely on using rank as seed to generate + // the exact same copy of data. + auto rank = collective::GetRank(); + SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); + HostDeviceVector storage; + std::string interface_str = RandomDataGenerator{kRows, kCols, 0} + .Device(0) + .Seed(rank + seed) + .GenerateArrayInterface(&storage); + data::CupyAdapter adapter(interface_str); + AdapterDeviceSketch(adapter.Value(), n_bins, info, std::numeric_limits::quiet_NaN(), + &sketch_distributed); + sketch_distributed.AllReduce(); + sketch_distributed.Unique(); + + ASSERT_EQ(sketch_distributed.ColumnsPtr().size(), + sketch_on_single_node.ColumnsPtr().size()); + ASSERT_EQ(sketch_distributed.Data().size(), sketch_on_single_node.Data().size()); + + TestQuantileElemRank(0, sketch_distributed.Data(), sketch_distributed.ColumnsPtr(), true); + + std::vector single_node_data(sketch_on_single_node.Data().size()); + dh::CopyDeviceSpanToVector(&single_node_data, sketch_on_single_node.Data()); + + std::vector distributed_data(sketch_distributed.Data().size()); + dh::CopyDeviceSpanToVector(&distributed_data, sketch_distributed.Data()); + float Eps = 2e-4 * world; + + for (size_t i = 0; i < single_node_data.size(); ++i) { + ASSERT_NEAR(single_node_data[i].value, distributed_data[i].value, Eps); + ASSERT_NEAR(single_node_data[i].rmax, distributed_data[i].rmax, Eps); + ASSERT_NEAR(single_node_data[i].rmin, distributed_data[i].rmin, Eps); + ASSERT_NEAR(single_node_data[i].wmin, distributed_data[i].wmin, Eps); + } + }); + collective::Finalize(); + }); + } + for (auto& thread : threads) { + thread.join(); + } } TEST(GPUQuantile, SameOnAllWorkers) { - std::string msg {"Skipping SameOnAllWorkers test"}; - auto n_gpus = AllVisibleGPUs(); - InitCommunicatorContext(msg, n_gpus); - auto world = collective::GetWorldSize(); - if (world != 1) { - ASSERT_EQ(world, n_gpus); - } else { - return; - } - - constexpr size_t kRows = 1000, kCols = 100; - RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, - MetaInfo const &info) { - auto rank = collective::GetRank(); - HostDeviceVector ft; - SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); - HostDeviceVector storage; - std::string interface_str = RandomDataGenerator{kRows, kCols, 0} - .Device(0) - .Seed(rank + seed) - .GenerateArrayInterface(&storage); - data::CupyAdapter adapter(interface_str); - AdapterDeviceSketch(adapter.Value(), n_bins, info, - std::numeric_limits::quiet_NaN(), - &sketch_distributed); - sketch_distributed.AllReduce(); - sketch_distributed.Unique(); - TestQuantileElemRank(0, sketch_distributed.Data(), sketch_distributed.ColumnsPtr(), true); - - // Test for all workers having the same sketch. - size_t n_data = sketch_distributed.Data().size(); - collective::Allreduce(&n_data, 1); - ASSERT_EQ(n_data, sketch_distributed.Data().size()); - size_t size_as_float = - sketch_distributed.Data().size_bytes() / sizeof(float); - auto local_data = Span{ - reinterpret_cast(sketch_distributed.Data().data()), - size_as_float}; - - dh::caching_device_vector all_workers(size_as_float * world); - thrust::fill(all_workers.begin(), all_workers.end(), 0); - thrust::copy(thrust::device, local_data.data(), - local_data.data() + local_data.size(), - all_workers.begin() + local_data.size() * rank); - collective::DeviceCommunicator* communicator = collective::Communicator::GetDevice(0); - - communicator->AllReduceSum(all_workers.data().get(), all_workers.size()); - communicator->Synchronize(); - - auto base_line = dh::ToSpan(all_workers).subspan(0, size_as_float); - std::vector h_base_line(base_line.size()); - dh::CopyDeviceSpanToVector(&h_base_line, base_line); - - size_t offset = 0; - for (decltype(world) i = 0; i < world; ++i) { - auto comp = dh::ToSpan(all_workers).subspan(offset, size_as_float); - std::vector h_comp(comp.size()); - dh::CopyDeviceSpanToVector(&h_comp, comp); - ASSERT_EQ(comp.size(), base_line.size()); - for (size_t j = 0; j < h_comp.size(); ++j) { - ASSERT_NEAR(h_base_line[j], h_comp[j], kRtEps); + auto const n_gpus = AllVisibleGPUs(); + std::vector threads; + for (auto rank = 0; rank < n_gpus; rank++) { + threads.emplace_back([=]() { + InitInMemoryCommunicator(n_gpus, rank); + + auto world = collective::GetWorldSize(); + if (world != 1) { + ASSERT_EQ(world, n_gpus); + } else { + return; } - offset += size_as_float; - } - }); + + constexpr size_t kRows = 1000, kCols = 100; + RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const& info) { + auto rank = collective::GetRank(); + HostDeviceVector ft; + SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); + HostDeviceVector storage; + std::string interface_str = RandomDataGenerator{kRows, kCols, 0} + .Device(0) + .Seed(rank + seed) + .GenerateArrayInterface(&storage); + data::CupyAdapter adapter(interface_str); + AdapterDeviceSketch(adapter.Value(), n_bins, info, std::numeric_limits::quiet_NaN(), + &sketch_distributed); + sketch_distributed.AllReduce(); + sketch_distributed.Unique(); + TestQuantileElemRank(0, sketch_distributed.Data(), sketch_distributed.ColumnsPtr(), true); + + // Test for all workers having the same sketch. + size_t n_data = sketch_distributed.Data().size(); + collective::Allreduce(&n_data, 1); + ASSERT_EQ(n_data, sketch_distributed.Data().size()); + size_t size_as_float = sketch_distributed.Data().size_bytes() / sizeof(float); + auto local_data = Span{ + reinterpret_cast(sketch_distributed.Data().data()), size_as_float}; + + dh::caching_device_vector all_workers(size_as_float * world); + thrust::fill(all_workers.begin(), all_workers.end(), 0); + thrust::copy(thrust::device, local_data.data(), local_data.data() + local_data.size(), + all_workers.begin() + local_data.size() * rank); + collective::DeviceCommunicator* communicator = collective::Communicator::GetDevice(0); + + communicator->AllReduceSum(all_workers.data().get(), all_workers.size()); + communicator->Synchronize(); + + auto base_line = dh::ToSpan(all_workers).subspan(0, size_as_float); + std::vector h_base_line(base_line.size()); + dh::CopyDeviceSpanToVector(&h_base_line, base_line); + + size_t offset = 0; + for (decltype(world) i = 0; i < world; ++i) { + auto comp = dh::ToSpan(all_workers).subspan(offset, size_as_float); + std::vector h_comp(comp.size()); + dh::CopyDeviceSpanToVector(&h_comp, comp); + ASSERT_EQ(comp.size(), base_line.size()); + for (size_t j = 0; j < h_comp.size(); ++j) { + ASSERT_NEAR(h_base_line[j], h_comp[j], kRtEps); + } + offset += size_as_float; + } + }); + }); + } + for (auto& thread : threads) { + thread.join(); + } } TEST(GPUQuantile, Push) { diff --git a/tests/cpp/common/test_quantile.h b/tests/cpp/common/test_quantile.h index 0d2e52cae9a6..957e5c98767b 100644 --- a/tests/cpp/common/test_quantile.h +++ b/tests/cpp/common/test_quantile.h @@ -10,31 +10,6 @@ namespace xgboost { namespace common { -inline void InitCommunicatorContext(std::string msg, int32_t n_workers) { - auto port = std::getenv("DMLC_TRACKER_PORT"); - std::string port_str; - if (port) { - port_str = port; - } else { - LOG(WARNING) << msg << " as `DMLC_TRACKER_PORT` is not set up."; - return; - } - auto uri = std::getenv("DMLC_TRACKER_URI"); - std::string uri_str; - if (uri) { - uri_str = uri; - } else { - LOG(WARNING) << msg << " as `DMLC_TRACKER_URI` is not set up."; - return; - } - - Json config{JsonObject()}; - config["DMLC_TRACKER_PORT"] = port_str; - config["DMLC_TRACKER_URI"] = uri_str; - config["DMLC_NUM_WORKER"] = n_workers; - collective::Init(config); -} - template void RunWithSeedsAndBins(size_t rows, Fn fn) { std::vector seeds(2); SimpleLCG lcg; diff --git a/tests/cpp/helpers.cc b/tests/cpp/helpers.cc index 04c3dd3ad530..865fe1e19488 100644 --- a/tests/cpp/helpers.cc +++ b/tests/cpp/helpers.cc @@ -15,11 +15,11 @@ #include #include +#include "../../src/collective/communicator-inl.h" #include "../../src/data/adapter.h" #include "../../src/data/iterative_dmatrix.h" #include "../../src/data/simple_dmatrix.h" #include "../../src/data/sparse_page_dmatrix.h" -#include "../../src/gbm/gbtree_model.h" #include "filesystem.h" // dmlc::TemporaryDirectory #include "xgboost/c_api.h" #include "xgboost/predictor.h" @@ -661,4 +661,13 @@ void DeleteRMMResource(RMMAllocator*) {} RMMAllocatorPtr SetUpRMMResourceForCppTests(int, char**) { return {nullptr, DeleteRMMResource}; } #endif // !defined(XGBOOST_USE_RMM) || XGBOOST_USE_RMM != 1 + +void InitInMemoryCommunicator(int32_t word_size, int32_t rank) { + Json config{JsonObject()}; + config["xgboost_communicator"] = String("in-memory"); + config["in_memory_world_size"] = word_size; + config["in_memory_rank"] = rank; + xgboost::collective::Init(config); +} + } // namespace xgboost diff --git a/tests/cpp/helpers.h b/tests/cpp/helpers.h index d3ccff2844b3..01f06b95fb76 100644 --- a/tests/cpp/helpers.h +++ b/tests/cpp/helpers.h @@ -1,8 +1,7 @@ /** * Copyright 2016-2023 by XGBoost contributors */ -#ifndef XGBOOST_TESTS_CPP_HELPERS_H_ -#define XGBOOST_TESTS_CPP_HELPERS_H_ +#pragma once #include #include @@ -460,5 +459,6 @@ inline LearnerModelParam MakeMP(bst_feature_t n_features, float base_score, uint return mparam; } +void InitInMemoryCommunicator(int32_t word_size, int32_t rank); + } // namespace xgboost -#endif diff --git a/tests/cpp/predictor/test_cpu_predictor.cc b/tests/cpp/predictor/test_cpu_predictor.cc index f0c50fa94ecc..480c79501631 100644 --- a/tests/cpp/predictor/test_cpu_predictor.cc +++ b/tests/cpp/predictor/test_cpu_predictor.cc @@ -100,11 +100,7 @@ TEST(CpuPredictor, ColumnSplit) { size_t constexpr kSliceSize = (kCols + 1) / kWorldSize; for (auto rank = 0; rank < kWorldSize; rank++) { threads.emplace_back([=, &dmat]() { - Json config{JsonObject()}; - config["xgboost_communicator"] = String("in-memory"); - config["in_memory_world_size"] = kWorldSize; - config["in_memory_rank"] = rank; - xgboost::collective::Init(config); + InitInMemoryCommunicator(kWorldSize, rank); auto lparam = CreateEmptyGenericParam(GPUIDX); std::unique_ptr cpu_predictor = diff --git a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py index ee00874efd00..4e61d9023a3c 100644 --- a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py +++ b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py @@ -486,49 +486,6 @@ def test_interface_consistency(self) -> None: for rn, drn in zip(ranker_names, dranker_names): assert rn == drn - def run_quantile(self, name: str, local_cuda_client: Client) -> None: - exe = None - for possible_path in { - "./testxgboost", - "./build/testxgboost", - "../build/testxgboost", - "../gpu-build/testxgboost", - }: - if os.path.exists(possible_path): - exe = possible_path - assert exe, "No testxgboost executable found." - test = "--gtest_filter=GPUQuantile." + name - - def runit( - worker_addr: str, rabit_args: Dict[str, Union[int, str]] - ) -> subprocess.CompletedProcess: - # setup environment for running the c++ part. - env = os.environ.copy() - env['DMLC_TRACKER_PORT'] = str(rabit_args['DMLC_TRACKER_PORT']) - env["DMLC_TRACKER_URI"] = str(rabit_args["DMLC_TRACKER_URI"]) - return subprocess.run([str(exe), test], env=env, stdout=subprocess.PIPE) - - workers = tm.get_client_workers(local_cuda_client) - rabit_args = local_cuda_client.sync( - dxgb._get_rabit_args, len(workers), None, local_cuda_client - ) - futures = local_cuda_client.map( - runit, workers, pure=False, workers=workers, rabit_args=rabit_args - ) - results = local_cuda_client.gather(futures) - for ret in results: - msg = ret.stdout.decode("utf-8") - assert msg.find("1 test from GPUQuantile") != -1, msg - assert ret.returncode == 0, msg - - @pytest.mark.gtest - def test_quantile_basic(self, local_cuda_client: Client) -> None: - self.run_quantile("AllReduceBasic", local_cuda_client) - - @pytest.mark.gtest - def test_quantile_same_on_all_workers(self, local_cuda_client: Client) -> None: - self.run_quantile("SameOnAllWorkers", local_cuda_client) - @pytest.mark.skipif(**tm.no_cupy()) def test_with_asyncio(local_cuda_client: Client) -> None: diff --git a/tests/test_distributed/test_with_dask/test_with_dask.py b/tests/test_distributed/test_with_dask/test_with_dask.py index 244c6f1e2799..03f3a3e467b2 100644 --- a/tests/test_distributed/test_with_dask/test_with_dask.py +++ b/tests/test_distributed/test_with_dask/test_with_dask.py @@ -1490,62 +1490,6 @@ def test_approx( num_rounds = 10 self.run_updater_test(client, params, num_rounds, dataset, 'approx') - def run_quantile(self, name: str) -> None: - exe: Optional[str] = None - for possible_path in {'./testxgboost', './build/testxgboost', - '../build/cpubuild/testxgboost', - '../cpu-build/testxgboost'}: - if os.path.exists(possible_path): - exe = possible_path - if exe is None: - return - - test = "--gtest_filter=Quantile." + name - - def runit( - worker_addr: str, rabit_args: Dict[str, Union[int, str]] - ) -> subprocess.CompletedProcess: - # setup environment for running the c++ part. - env = os.environ.copy() - env['DMLC_TRACKER_PORT'] = str(rabit_args['DMLC_TRACKER_PORT']) - env["DMLC_TRACKER_URI"] = str(rabit_args["DMLC_TRACKER_URI"]) - return subprocess.run([str(exe), test], env=env, capture_output=True) - - with LocalCluster(n_workers=4, dashboard_address=":0") as cluster: - with Client(cluster) as client: - workers = tm.get_client_workers(client) - rabit_args = client.sync( - xgb.dask._get_rabit_args, len(workers), None, client - ) - futures = client.map(runit, - workers, - pure=False, - workers=workers, - rabit_args=rabit_args) - results = client.gather(futures) - - for ret in results: - msg = ret.stdout.decode('utf-8') - assert msg.find('1 test from Quantile') != -1, msg - assert ret.returncode == 0, msg - - @pytest.mark.skipif(**tm.no_dask()) - @pytest.mark.gtest - def test_quantile_basic(self) -> None: - self.run_quantile('DistributedBasic') - self.run_quantile('SortedDistributedBasic') - - @pytest.mark.skipif(**tm.no_dask()) - @pytest.mark.gtest - def test_quantile(self) -> None: - self.run_quantile('Distributed') - self.run_quantile('SortedDistributed') - - @pytest.mark.skipif(**tm.no_dask()) - @pytest.mark.gtest - def test_quantile_same_on_all_workers(self) -> None: - self.run_quantile("SameOnAllWorkers") - def test_adaptive(self) -> None: def get_score(config: Dict) -> float: return float(config["learner"]["learner_model_param"]["base_score"]) From 54b175fe7f58b17cb8a836c3f4464389deaa03ec Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 26 Jan 2023 09:33:17 -0800 Subject: [PATCH 2/4] better helper --- tests/cpp/common/test_quantile.cc | 261 +++++++++---------- tests/cpp/common/test_quantile.cu | 304 +++++++++++----------- tests/cpp/helpers.cc | 11 +- tests/cpp/helpers.h | 23 +- tests/cpp/predictor/test_cpu_predictor.cc | 63 +++-- tests/pytest.ini | 1 - 6 files changed, 332 insertions(+), 331 deletions(-) diff --git a/tests/cpp/common/test_quantile.cc b/tests/cpp/common/test_quantile.cc index 52fb0d9689ea..d6b5936ab4e2 100644 --- a/tests/cpp/common/test_quantile.cc +++ b/tests/cpp/common/test_quantile.cc @@ -5,9 +5,8 @@ #include -#include - #include "../../../src/common/hist_util.h" +#include "../../../src/common/quantile.h" #include "../../../src/data/adapter.h" namespace xgboost { @@ -41,125 +40,121 @@ void PushPage(HostSketchContainer* container, SparsePage const& page, MetaInfo c Span hessian) { container->PushRowPage(page, info, hessian); } -} // anonymous namespace template -void TestDistributedQuantile(size_t rows, size_t cols) { - auto constexpr kWorkers = 4; - std::vector threads; - for (auto rank = 0; rank < kWorkers; rank++) { - threads.emplace_back([=]() { - InitInMemoryCommunicator(kWorkers, rank); - - auto world = collective::GetWorldSize(); - if (world != 1) { - ASSERT_EQ(world, kWorkers); - } else { - return; - } - - std::vector infos(2); - auto& h_weights = infos.front().weights_.HostVector(); - h_weights.resize(rows); - SimpleLCG lcg; - SimpleRealUniformDistribution dist(3, 1000); - std::generate(h_weights.begin(), h_weights.end(), [&]() { return dist(&lcg); }); - std::vector column_size(cols, rows); - size_t n_bins = 64; - - // Generate cuts for distributed environment. - auto sparsity = 0.5f; - std::vector ft(cols); - for (size_t i = 0; i < ft.size(); ++i) { - ft[i] = (i % 2 == 0) ? FeatureType::kNumerical : FeatureType::kCategorical; - } - - auto m = RandomDataGenerator{rows, cols, sparsity} - .Seed(rank) - .Lower(.0f) - .Upper(1.0f) - .Type(ft) - .MaxCategory(13) - .GenerateDMatrix(); +void DoTestDistributedQuantile(int32_t workers, size_t rows, size_t cols) { + auto const world = collective::GetWorldSize(); + if (world != 1) { + ASSERT_EQ(world, workers); + } else { + return; + } - std::vector hessian(rows, 1.0); - auto hess = Span{hessian}; + std::vector infos(2); + auto& h_weights = infos.front().weights_.HostVector(); + h_weights.resize(rows); + SimpleLCG lcg; + SimpleRealUniformDistribution dist(3, 1000); + std::generate(h_weights.begin(), h_weights.end(), [&]() { return dist(&lcg); }); + std::vector column_size(cols, rows); + size_t n_bins = 64; + + // Generate cuts for distributed environment. + auto sparsity = 0.5f; + auto rank = collective::GetRank(); + std::vector ft(cols); + for (size_t i = 0; i < ft.size(); ++i) { + ft[i] = (i % 2 == 0) ? FeatureType::kNumerical : FeatureType::kCategorical; + } - ContainerType sketch_distributed(n_bins, m->Info().feature_types.ConstHostSpan(), - column_size, false, OmpGetNumThreads(0)); + auto m = RandomDataGenerator{rows, cols, sparsity} + .Seed(rank) + .Lower(.0f) + .Upper(1.0f) + .Type(ft) + .MaxCategory(13) + .GenerateDMatrix(); + + std::vector hessian(rows, 1.0); + auto hess = Span{hessian}; + + ContainerType sketch_distributed(n_bins, m->Info().feature_types.ConstHostSpan(), + column_size, false, OmpGetNumThreads(0)); + + if (use_column) { + for (auto const& page : m->GetBatches()) { + PushPage(&sketch_distributed, page, m->Info(), hess); + } + } else { + for (auto const& page : m->GetBatches()) { + PushPage(&sketch_distributed, page, m->Info(), hess); + } + } - if (use_column) { - for (auto const& page : m->GetBatches()) { - PushPage(&sketch_distributed, page, m->Info(), hess); - } - } else { - for (auto const& page : m->GetBatches()) { - PushPage(&sketch_distributed, page, m->Info(), hess); - } + HistogramCuts distributed_cuts; + sketch_distributed.MakeCuts(&distributed_cuts); + + // Generate cuts for single node environment + collective::Finalize(); + CHECK_EQ(collective::GetWorldSize(), 1); + std::for_each(column_size.begin(), column_size.end(), [=](auto& size) { size *= world; }); + m->Info().num_row_ = world * rows; + ContainerType sketch_on_single_node(n_bins, m->Info().feature_types.ConstHostSpan(), + column_size, false, OmpGetNumThreads(0)); + m->Info().num_row_ = rows; + + for (auto rank = 0; rank < world; ++rank) { + auto m = RandomDataGenerator{rows, cols, sparsity} + .Seed(rank) + .Type(ft) + .MaxCategory(13) + .Lower(.0f) + .Upper(1.0f) + .GenerateDMatrix(); + if (use_column) { + for (auto const& page : m->GetBatches()) { + PushPage(&sketch_on_single_node, page, m->Info(), hess); } - - HistogramCuts distributed_cuts; - sketch_distributed.MakeCuts(&distributed_cuts); - - // Generate cuts for single node environment - collective::Finalize(); - CHECK_EQ(collective::GetWorldSize(), 1); - std::for_each(column_size.begin(), column_size.end(), [=](auto& size) { size *= world; }); - m->Info().num_row_ = world * rows; - ContainerType sketch_on_single_node( - n_bins, m->Info().feature_types.ConstHostSpan(), column_size, false, OmpGetNumThreads(0)); - m->Info().num_row_ = rows; - - for (auto rank = 0; rank < world; ++rank) { - auto m = RandomDataGenerator{rows, cols, sparsity} - .Seed(rank) - .Type(ft) - .MaxCategory(13) - .Lower(.0f) - .Upper(1.0f) - .GenerateDMatrix(); - if (use_column) { - for (auto const& page : m->GetBatches()) { - PushPage(&sketch_on_single_node, page, m->Info(), hess); - } - } else { - for (auto const& page : m->GetBatches()) { - PushPage(&sketch_on_single_node, page, m->Info(), hess); - } - } + } else { + for (auto const& page : m->GetBatches()) { + PushPage(&sketch_on_single_node, page, m->Info(), hess); } + } + } - HistogramCuts single_node_cuts; - sketch_on_single_node.MakeCuts(&single_node_cuts); + HistogramCuts single_node_cuts; + sketch_on_single_node.MakeCuts(&single_node_cuts); - auto const& sptrs = single_node_cuts.Ptrs(); - auto const& dptrs = distributed_cuts.Ptrs(); - auto const& svals = single_node_cuts.Values(); - auto const& dvals = distributed_cuts.Values(); - auto const& smins = single_node_cuts.MinValues(); - auto const& dmins = distributed_cuts.MinValues(); + auto const& sptrs = single_node_cuts.Ptrs(); + auto const& dptrs = distributed_cuts.Ptrs(); + auto const& svals = single_node_cuts.Values(); + auto const& dvals = distributed_cuts.Values(); + auto const& smins = single_node_cuts.MinValues(); + auto const& dmins = distributed_cuts.MinValues(); - ASSERT_EQ(sptrs.size(), dptrs.size()); - for (size_t i = 0; i < sptrs.size(); ++i) { - ASSERT_EQ(sptrs[i], dptrs[i]) << i; - } - - ASSERT_EQ(svals.size(), dvals.size()); - for (size_t i = 0; i < svals.size(); ++i) { - ASSERT_NEAR(svals[i], dvals[i], 2e-2f); - } + ASSERT_EQ(sptrs.size(), dptrs.size()); + for (size_t i = 0; i < sptrs.size(); ++i) { + ASSERT_EQ(sptrs[i], dptrs[i]) << i; + } - ASSERT_EQ(smins.size(), dmins.size()); - for (size_t i = 0; i < smins.size(); ++i) { - ASSERT_FLOAT_EQ(smins[i], dmins[i]); - } - }); + ASSERT_EQ(svals.size(), dvals.size()); + for (size_t i = 0; i < svals.size(); ++i) { + ASSERT_NEAR(svals[i], dvals[i], 2e-2f); } - for (auto& thread : threads) { - thread.join(); + + ASSERT_EQ(smins.size(), dmins.size()); + for (size_t i = 0; i < smins.size(); ++i) { + ASSERT_FLOAT_EQ(smins[i], dmins[i]); } } +template +void TestDistributedQuantile(size_t const rows, size_t const cols) { + auto constexpr kWorkers = 4; + RunWithInMemoryCommunicator(kWorkers, DoTestDistributedQuantile, kWorkers, rows, cols); +} +} // anonymous namespace + TEST(Quantile, DistributedBasic) { constexpr size_t kRows = 10, kCols = 10; TestDistributedQuantile(kRows, kCols); @@ -180,22 +175,19 @@ TEST(Quantile, SortedDistributed) { TestDistributedQuantile(kRows, kCols); } -TEST(Quantile, SameOnAllWorkers) { - auto constexpr kWorkers = 4; - std::vector threads; - for (auto rank = 0; rank < kWorkers; rank++) { - threads.emplace_back([=]() { - InitInMemoryCommunicator(kWorkers, rank); - - auto world = collective::GetWorldSize(); - if (world != 1) { - ASSERT_EQ(world, kWorkers); - } else { - return; - } +namespace { +void TestSameOnAllWorkers(int32_t workers) { + auto const world = collective::GetWorldSize(); + if (world != 1) { + CHECK_EQ(world, workers); + } else { + LOG(WARNING) << "Skipping Quantile SameOnAllWorkers test"; + return; + } - constexpr size_t kRows = 1000, kCols = 100; - RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const&) { + constexpr size_t kRows = 1000, kCols = 100; + RunWithSeedsAndBins( + kRows, [=](int32_t seed, size_t n_bins, MetaInfo const&) { auto rank = collective::GetRank(); HostDeviceVector storage; std::vector ft(kCols); @@ -211,8 +203,9 @@ TEST(Quantile, SameOnAllWorkers) { .GenerateDMatrix(); auto cuts = SketchOnDMatrix(m.get(), n_bins, common::OmpGetNumThreads(0)); std::vector cut_values(cuts.Values().size() * world, 0); - std::vector::value_type> cut_ptrs( - cuts.Ptrs().size() * world, 0); + std::vector< + typename std::remove_reference_t::value_type> + cut_ptrs(cuts.Ptrs().size() * world, 0); std::vector cut_min_values(cuts.MinValues().size() * world, 0); size_t value_size = cuts.Values().size(); @@ -225,17 +218,18 @@ TEST(Quantile, SameOnAllWorkers) { CHECK_EQ(min_value_size, kCols); size_t value_offset = value_size * rank; - std::copy(cuts.Values().begin(), cuts.Values().end(), cut_values.begin() + value_offset); + std::copy(cuts.Values().begin(), cuts.Values().end(), + cut_values.begin() + value_offset); size_t ptr_offset = ptr_size * rank; - std::copy(cuts.Ptrs().cbegin(), cuts.Ptrs().cend(), cut_ptrs.begin() + ptr_offset); + std::copy(cuts.Ptrs().cbegin(), cuts.Ptrs().cend(), + cut_ptrs.begin() + ptr_offset); size_t min_values_offset = min_value_size * rank; std::copy(cuts.MinValues().cbegin(), cuts.MinValues().cend(), cut_min_values.begin() + min_values_offset); collective::Allreduce(cut_values.data(), cut_values.size()); collective::Allreduce(cut_ptrs.data(), cut_ptrs.size()); - collective::Allreduce(cut_min_values.data(), - cut_min_values.size()); + collective::Allreduce(cut_min_values.data(), cut_min_values.size()); for (int32_t i = 0; i < world; i++) { for (size_t j = 0; j < value_size; ++j) { @@ -254,12 +248,13 @@ TEST(Quantile, SameOnAllWorkers) { } } }); - collective::Finalize(); - }); - } - for (auto& thread : threads) { - thread.join(); - } } +} // anonymous namespace + +TEST(Quantile, SameOnAllWorkers) { + auto constexpr kWorkers = 4; + RunWithInMemoryCommunicator(kWorkers, TestSameOnAllWorkers, kWorkers); +} + } // namespace common } // namespace xgboost diff --git a/tests/cpp/common/test_quantile.cu b/tests/cpp/common/test_quantile.cu index 47610a07ef53..d3f7dbed0f4b 100644 --- a/tests/cpp/common/test_quantile.cu +++ b/tests/cpp/common/test_quantile.cu @@ -1,12 +1,9 @@ #include - -#include - +#include "test_quantile.h" +#include "../helpers.h" #include "../../../src/collective/device_communicator.cuh" #include "../../../src/common/hist_util.cuh" #include "../../../src/common/quantile.cuh" -#include "../helpers.h" -#include "test_quantile.h" namespace xgboost { namespace { @@ -341,162 +338,163 @@ TEST(GPUQuantile, MultiMerge) { }); } +namespace { +void TestAllReduceBasic(int32_t n_gpus) { + auto const world = collective::GetWorldSize(); + if (world != 1) { + ASSERT_EQ(world, n_gpus); + } else { + return; + } + + constexpr size_t kRows = 1000, kCols = 100; + RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const& info) { + // Set up single node version; + HostDeviceVector ft; + SketchContainer sketch_on_single_node(ft, n_bins, kCols, kRows, 0); + + size_t intermediate_num_cuts = std::min( + kRows * world, static_cast(n_bins * WQSketch::kFactor)); + std::vector containers; + for (auto rank = 0; rank < world; ++rank) { + HostDeviceVector storage; + std::string interface_str = RandomDataGenerator{kRows, kCols, 0} + .Device(0) + .Seed(rank + seed) + .GenerateArrayInterface(&storage); + data::CupyAdapter adapter(interface_str); + HostDeviceVector ft; + containers.emplace_back(ft, n_bins, kCols, kRows, 0); + AdapterDeviceSketch(adapter.Value(), n_bins, info, + std::numeric_limits::quiet_NaN(), + &containers.back()); + } + for (auto &sketch : containers) { + sketch.Prune(intermediate_num_cuts); + sketch_on_single_node.Merge(sketch.ColumnsPtr(), sketch.Data()); + sketch_on_single_node.FixError(); + } + sketch_on_single_node.Unique(); + TestQuantileElemRank(0, sketch_on_single_node.Data(), + sketch_on_single_node.ColumnsPtr(), true); + + // Set up distributed version. We rely on using rank as seed to generate + // the exact same copy of data. + auto rank = collective::GetRank(); + SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); + HostDeviceVector storage; + std::string interface_str = RandomDataGenerator{kRows, kCols, 0} + .Device(0) + .Seed(rank + seed) + .GenerateArrayInterface(&storage); + data::CupyAdapter adapter(interface_str); + AdapterDeviceSketch(adapter.Value(), n_bins, info, + std::numeric_limits::quiet_NaN(), + &sketch_distributed); + sketch_distributed.AllReduce(); + sketch_distributed.Unique(); + + ASSERT_EQ(sketch_distributed.ColumnsPtr().size(), + sketch_on_single_node.ColumnsPtr().size()); + ASSERT_EQ(sketch_distributed.Data().size(), + sketch_on_single_node.Data().size()); + + TestQuantileElemRank(0, sketch_distributed.Data(), + sketch_distributed.ColumnsPtr(), true); + + std::vector single_node_data( + sketch_on_single_node.Data().size()); + dh::CopyDeviceSpanToVector(&single_node_data, sketch_on_single_node.Data()); + + std::vector distributed_data(sketch_distributed.Data().size()); + dh::CopyDeviceSpanToVector(&distributed_data, sketch_distributed.Data()); + float Eps = 2e-4 * world; + + for (size_t i = 0; i < single_node_data.size(); ++i) { + ASSERT_NEAR(single_node_data[i].value, distributed_data[i].value, Eps); + ASSERT_NEAR(single_node_data[i].rmax, distributed_data[i].rmax, Eps); + ASSERT_NEAR(single_node_data[i].rmin, distributed_data[i].rmin, Eps); + ASSERT_NEAR(single_node_data[i].wmin, distributed_data[i].wmin, Eps); + } + }); +} +} // anonymous namespace + TEST(GPUQuantile, AllReduceBasic) { auto const n_gpus = AllVisibleGPUs(); - std::vector threads; - for (auto rank = 0; rank < n_gpus; rank++) { - threads.emplace_back([=]() { - InitInMemoryCommunicator(n_gpus, rank); - - auto const world = collective::GetWorldSize(); - if (world != 1) { - ASSERT_EQ(world, n_gpus); - } else { - return; - } + RunWithInMemoryCommunicator(n_gpus, TestAllReduceBasic, n_gpus); +} - constexpr size_t kRows = 1000, kCols = 100; - RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const& info) { - // Set up single node version; - HostDeviceVector ft; - SketchContainer sketch_on_single_node(ft, n_bins, kCols, kRows, 0); - - size_t intermediate_num_cuts = - std::min(kRows * world, static_cast(n_bins * WQSketch::kFactor)); - std::vector containers; - for (auto rank = 0; rank < world; ++rank) { - HostDeviceVector storage; - std::string interface_str = RandomDataGenerator{kRows, kCols, 0} - .Device(0) - .Seed(rank + seed) - .GenerateArrayInterface(&storage); - data::CupyAdapter adapter(interface_str); - HostDeviceVector ft; - containers.emplace_back(ft, n_bins, kCols, kRows, 0); - AdapterDeviceSketch(adapter.Value(), n_bins, info, - std::numeric_limits::quiet_NaN(), &containers.back()); - } - for (auto& sketch : containers) { - sketch.Prune(intermediate_num_cuts); - sketch_on_single_node.Merge(sketch.ColumnsPtr(), sketch.Data()); - sketch_on_single_node.FixError(); - } - sketch_on_single_node.Unique(); - TestQuantileElemRank(0, sketch_on_single_node.Data(), sketch_on_single_node.ColumnsPtr(), - true); - - // Set up distributed version. We rely on using rank as seed to generate - // the exact same copy of data. - auto rank = collective::GetRank(); - SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); - HostDeviceVector storage; - std::string interface_str = RandomDataGenerator{kRows, kCols, 0} - .Device(0) - .Seed(rank + seed) - .GenerateArrayInterface(&storage); - data::CupyAdapter adapter(interface_str); - AdapterDeviceSketch(adapter.Value(), n_bins, info, std::numeric_limits::quiet_NaN(), - &sketch_distributed); - sketch_distributed.AllReduce(); - sketch_distributed.Unique(); - - ASSERT_EQ(sketch_distributed.ColumnsPtr().size(), - sketch_on_single_node.ColumnsPtr().size()); - ASSERT_EQ(sketch_distributed.Data().size(), sketch_on_single_node.Data().size()); - - TestQuantileElemRank(0, sketch_distributed.Data(), sketch_distributed.ColumnsPtr(), true); - - std::vector single_node_data(sketch_on_single_node.Data().size()); - dh::CopyDeviceSpanToVector(&single_node_data, sketch_on_single_node.Data()); - - std::vector distributed_data(sketch_distributed.Data().size()); - dh::CopyDeviceSpanToVector(&distributed_data, sketch_distributed.Data()); - float Eps = 2e-4 * world; - - for (size_t i = 0; i < single_node_data.size(); ++i) { - ASSERT_NEAR(single_node_data[i].value, distributed_data[i].value, Eps); - ASSERT_NEAR(single_node_data[i].rmax, distributed_data[i].rmax, Eps); - ASSERT_NEAR(single_node_data[i].rmin, distributed_data[i].rmin, Eps); - ASSERT_NEAR(single_node_data[i].wmin, distributed_data[i].wmin, Eps); - } - }); - collective::Finalize(); - }); - } - for (auto& thread : threads) { - thread.join(); +namespace { +void TestSameOnAllWorkers(int32_t n_gpus) { + auto world = collective::GetWorldSize(); + if (world != 1) { + ASSERT_EQ(world, n_gpus); + } else { + return; } + + constexpr size_t kRows = 1000, kCols = 100; + RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, + MetaInfo const &info) { + auto rank = collective::GetRank(); + HostDeviceVector ft; + SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); + HostDeviceVector storage; + std::string interface_str = RandomDataGenerator{kRows, kCols, 0} + .Device(0) + .Seed(rank + seed) + .GenerateArrayInterface(&storage); + data::CupyAdapter adapter(interface_str); + AdapterDeviceSketch(adapter.Value(), n_bins, info, + std::numeric_limits::quiet_NaN(), + &sketch_distributed); + sketch_distributed.AllReduce(); + sketch_distributed.Unique(); + TestQuantileElemRank(0, sketch_distributed.Data(), sketch_distributed.ColumnsPtr(), true); + + // Test for all workers having the same sketch. + size_t n_data = sketch_distributed.Data().size(); + collective::Allreduce(&n_data, 1); + ASSERT_EQ(n_data, sketch_distributed.Data().size()); + size_t size_as_float = + sketch_distributed.Data().size_bytes() / sizeof(float); + auto local_data = Span{ + reinterpret_cast(sketch_distributed.Data().data()), + size_as_float}; + + dh::caching_device_vector all_workers(size_as_float * world); + thrust::fill(all_workers.begin(), all_workers.end(), 0); + thrust::copy(thrust::device, local_data.data(), + local_data.data() + local_data.size(), + all_workers.begin() + local_data.size() * rank); + collective::DeviceCommunicator* communicator = collective::Communicator::GetDevice(0); + + communicator->AllReduceSum(all_workers.data().get(), all_workers.size()); + communicator->Synchronize(); + + auto base_line = dh::ToSpan(all_workers).subspan(0, size_as_float); + std::vector h_base_line(base_line.size()); + dh::CopyDeviceSpanToVector(&h_base_line, base_line); + + size_t offset = 0; + for (decltype(world) i = 0; i < world; ++i) { + auto comp = dh::ToSpan(all_workers).subspan(offset, size_as_float); + std::vector h_comp(comp.size()); + dh::CopyDeviceSpanToVector(&h_comp, comp); + ASSERT_EQ(comp.size(), base_line.size()); + for (size_t j = 0; j < h_comp.size(); ++j) { + ASSERT_NEAR(h_base_line[j], h_comp[j], kRtEps); + } + offset += size_as_float; + } + }); } +} // anonymous namespace TEST(GPUQuantile, SameOnAllWorkers) { auto const n_gpus = AllVisibleGPUs(); - std::vector threads; - for (auto rank = 0; rank < n_gpus; rank++) { - threads.emplace_back([=]() { - InitInMemoryCommunicator(n_gpus, rank); - - auto world = collective::GetWorldSize(); - if (world != 1) { - ASSERT_EQ(world, n_gpus); - } else { - return; - } - - constexpr size_t kRows = 1000, kCols = 100; - RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const& info) { - auto rank = collective::GetRank(); - HostDeviceVector ft; - SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); - HostDeviceVector storage; - std::string interface_str = RandomDataGenerator{kRows, kCols, 0} - .Device(0) - .Seed(rank + seed) - .GenerateArrayInterface(&storage); - data::CupyAdapter adapter(interface_str); - AdapterDeviceSketch(adapter.Value(), n_bins, info, std::numeric_limits::quiet_NaN(), - &sketch_distributed); - sketch_distributed.AllReduce(); - sketch_distributed.Unique(); - TestQuantileElemRank(0, sketch_distributed.Data(), sketch_distributed.ColumnsPtr(), true); - - // Test for all workers having the same sketch. - size_t n_data = sketch_distributed.Data().size(); - collective::Allreduce(&n_data, 1); - ASSERT_EQ(n_data, sketch_distributed.Data().size()); - size_t size_as_float = sketch_distributed.Data().size_bytes() / sizeof(float); - auto local_data = Span{ - reinterpret_cast(sketch_distributed.Data().data()), size_as_float}; - - dh::caching_device_vector all_workers(size_as_float * world); - thrust::fill(all_workers.begin(), all_workers.end(), 0); - thrust::copy(thrust::device, local_data.data(), local_data.data() + local_data.size(), - all_workers.begin() + local_data.size() * rank); - collective::DeviceCommunicator* communicator = collective::Communicator::GetDevice(0); - - communicator->AllReduceSum(all_workers.data().get(), all_workers.size()); - communicator->Synchronize(); - - auto base_line = dh::ToSpan(all_workers).subspan(0, size_as_float); - std::vector h_base_line(base_line.size()); - dh::CopyDeviceSpanToVector(&h_base_line, base_line); - - size_t offset = 0; - for (decltype(world) i = 0; i < world; ++i) { - auto comp = dh::ToSpan(all_workers).subspan(offset, size_as_float); - std::vector h_comp(comp.size()); - dh::CopyDeviceSpanToVector(&h_comp, comp); - ASSERT_EQ(comp.size(), base_line.size()); - for (size_t j = 0; j < h_comp.size(); ++j) { - ASSERT_NEAR(h_base_line[j], h_comp[j], kRtEps); - } - offset += size_as_float; - } - }); - }); - } - for (auto& thread : threads) { - thread.join(); - } + RunWithInMemoryCommunicator(n_gpus, TestSameOnAllWorkers, n_gpus); } TEST(GPUQuantile, Push) { diff --git a/tests/cpp/helpers.cc b/tests/cpp/helpers.cc index 865fe1e19488..04c3dd3ad530 100644 --- a/tests/cpp/helpers.cc +++ b/tests/cpp/helpers.cc @@ -15,11 +15,11 @@ #include #include -#include "../../src/collective/communicator-inl.h" #include "../../src/data/adapter.h" #include "../../src/data/iterative_dmatrix.h" #include "../../src/data/simple_dmatrix.h" #include "../../src/data/sparse_page_dmatrix.h" +#include "../../src/gbm/gbtree_model.h" #include "filesystem.h" // dmlc::TemporaryDirectory #include "xgboost/c_api.h" #include "xgboost/predictor.h" @@ -661,13 +661,4 @@ void DeleteRMMResource(RMMAllocator*) {} RMMAllocatorPtr SetUpRMMResourceForCppTests(int, char**) { return {nullptr, DeleteRMMResource}; } #endif // !defined(XGBOOST_USE_RMM) || XGBOOST_USE_RMM != 1 - -void InitInMemoryCommunicator(int32_t word_size, int32_t rank) { - Json config{JsonObject()}; - config["xgboost_communicator"] = String("in-memory"); - config["in_memory_world_size"] = word_size; - config["in_memory_rank"] = rank; - xgboost::collective::Init(config); -} - } // namespace xgboost diff --git a/tests/cpp/helpers.h b/tests/cpp/helpers.h index 01f06b95fb76..31d166ee6ad5 100644 --- a/tests/cpp/helpers.h +++ b/tests/cpp/helpers.h @@ -15,8 +15,10 @@ #include #include #include +#include #include +#include "../../src/collective/communicator-inl.h" #include "../../src/common/common.h" #include "../../src/data/array_interface.h" #include "../../src/gbm/gbtree_model.h" @@ -459,6 +461,25 @@ inline LearnerModelParam MakeMP(bst_feature_t n_features, float base_score, uint return mparam; } -void InitInMemoryCommunicator(int32_t word_size, int32_t rank); +template +void RunWithInMemoryCommunicator(int32_t world_size, Function&& function, Args&&... args) { + std::vector threads; + for (auto rank = 0; rank < world_size; rank++) { + threads.emplace_back([=]() { + Json config{JsonObject()}; + config["xgboost_communicator"] = String("in-memory"); + config["in_memory_world_size"] = world_size; + config["in_memory_rank"] = rank; + xgboost::collective::Init(config); + + std::forward(function)(std::forward(args)...); + + xgboost::collective::Finalize(); + }); + } + for (auto& thread : threads) { + thread.join(); + } +} } // namespace xgboost diff --git a/tests/cpp/predictor/test_cpu_predictor.cc b/tests/cpp/predictor/test_cpu_predictor.cc index 480c79501631..af666432a022 100644 --- a/tests/cpp/predictor/test_cpu_predictor.cc +++ b/tests/cpp/predictor/test_cpu_predictor.cc @@ -90,45 +90,42 @@ TEST(CpuPredictor, Basic) { } } -TEST(CpuPredictor, ColumnSplit) { +namespace { +void TestColumnSplitPredictBatch() { size_t constexpr kRows = 5; size_t constexpr kCols = 5; auto dmat = RandomDataGenerator(kRows, kCols, 0).GenerateDMatrix(); + auto const world_size = collective::GetWorldSize(); + auto const rank = collective::GetRank(); + auto const kSliceSize = (kCols + 1) / world_size; + + auto lparam = CreateEmptyGenericParam(GPUIDX); + std::unique_ptr cpu_predictor = + std::unique_ptr(Predictor::Create("cpu_predictor", &lparam)); + + LearnerModelParam mparam{MakeMP(kCols, .0, 1)}; + + Context ctx; + ctx.UpdateAllowUnknown(Args{}); + gbm::GBTreeModel model = CreateTestModel(&mparam, &ctx); - std::vector threads; - std::int32_t constexpr kWorldSize = 2; - size_t constexpr kSliceSize = (kCols + 1) / kWorldSize; - for (auto rank = 0; rank < kWorldSize; rank++) { - threads.emplace_back([=, &dmat]() { - InitInMemoryCommunicator(kWorldSize, rank); - - auto lparam = CreateEmptyGenericParam(GPUIDX); - std::unique_ptr cpu_predictor = - std::unique_ptr(Predictor::Create("cpu_predictor", &lparam)); - - LearnerModelParam mparam{MakeMP(kCols, .0, 1)}; - - Context ctx; - ctx.UpdateAllowUnknown(Args{}); - gbm::GBTreeModel model = CreateTestModel(&mparam, &ctx); - - // Test predict batch - PredictionCacheEntry out_predictions; - cpu_predictor->InitOutPredictions(dmat->Info(), &out_predictions.predictions, model); - auto sliced = std::unique_ptr{dmat->SliceCol(rank * kSliceSize, kSliceSize)}; - cpu_predictor->PredictBatch(sliced.get(), &out_predictions, model, 0); - - std::vector& out_predictions_h = out_predictions.predictions.HostVector(); - for (size_t i = 0; i < out_predictions.predictions.Size(); i++) { - ASSERT_EQ(out_predictions_h[i], 1.5); - } - xgboost::collective::Finalize(); - }); - } - for (auto& thread : threads) { - thread.join(); + // Test predict batch + PredictionCacheEntry out_predictions; + cpu_predictor->InitOutPredictions(dmat->Info(), &out_predictions.predictions, model); + auto sliced = std::unique_ptr{dmat->SliceCol(rank * kSliceSize, kSliceSize)}; + cpu_predictor->PredictBatch(sliced.get(), &out_predictions, model, 0); + + std::vector& out_predictions_h = out_predictions.predictions.HostVector(); + for (size_t i = 0; i < out_predictions.predictions.Size(); i++) { + ASSERT_EQ(out_predictions_h[i], 1.5); } } +} // anonymous namespace + +TEST(CpuPredictor, ColumnSplit) { + auto constexpr kWorldSize = 2; + RunWithInMemoryCommunicator(kWorldSize, TestColumnSplitPredictBatch); +} TEST(CpuPredictor, IterationRange) { TestIterationRange("cpu_predictor"); diff --git a/tests/pytest.ini b/tests/pytest.ini index 5a0d27a6cec6..fc0a40ff691b 100644 --- a/tests/pytest.ini +++ b/tests/pytest.ini @@ -2,4 +2,3 @@ markers = mgpu: Mark a test that requires multiple GPUs to run. ci: Mark a test that runs only on CI. - gtest: Mark a test that requires C++ Google Test executable. From 5dbffe475c0389fcd99726b715b64a52ec63d25f Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 26 Jan 2023 10:57:44 -0800 Subject: [PATCH 3/4] try to fix gcc-7 --- tests/cpp/helpers.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cpp/helpers.h b/tests/cpp/helpers.h index 31d166ee6ad5..80f504d8ecbd 100644 --- a/tests/cpp/helpers.h +++ b/tests/cpp/helpers.h @@ -465,7 +465,7 @@ template void RunWithInMemoryCommunicator(int32_t world_size, Function&& function, Args&&... args) { std::vector threads; for (auto rank = 0; rank < world_size; rank++) { - threads.emplace_back([=]() { + threads.emplace_back([&, rank]() { Json config{JsonObject()}; config["xgboost_communicator"] = String("in-memory"); config["in_memory_world_size"] = world_size; From 6d7c69b1f9efe1bb1046a3d7972a9609152747a0 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 26 Jan 2023 20:22:49 -0800 Subject: [PATCH 4/4] remove unnecessary checks --- tests/cpp/common/test_quantile.cc | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/tests/cpp/common/test_quantile.cc b/tests/cpp/common/test_quantile.cc index d6b5936ab4e2..bd6932aa3b14 100644 --- a/tests/cpp/common/test_quantile.cc +++ b/tests/cpp/common/test_quantile.cc @@ -42,14 +42,8 @@ void PushPage(HostSketchContainer* container, SparsePage const& page, MetaInfo c } template -void DoTestDistributedQuantile(int32_t workers, size_t rows, size_t cols) { +void DoTestDistributedQuantile(size_t rows, size_t cols) { auto const world = collective::GetWorldSize(); - if (world != 1) { - ASSERT_EQ(world, workers); - } else { - return; - } - std::vector infos(2); auto& h_weights = infos.front().weights_.HostVector(); h_weights.resize(rows); @@ -151,7 +145,7 @@ void DoTestDistributedQuantile(int32_t workers, size_t rows, size_t cols) { template void TestDistributedQuantile(size_t const rows, size_t const cols) { auto constexpr kWorkers = 4; - RunWithInMemoryCommunicator(kWorkers, DoTestDistributedQuantile, kWorkers, rows, cols); + RunWithInMemoryCommunicator(kWorkers, DoTestDistributedQuantile, rows, cols); } } // anonymous namespace @@ -176,15 +170,8 @@ TEST(Quantile, SortedDistributed) { } namespace { -void TestSameOnAllWorkers(int32_t workers) { +void TestSameOnAllWorkers() { auto const world = collective::GetWorldSize(); - if (world != 1) { - CHECK_EQ(world, workers); - } else { - LOG(WARNING) << "Skipping Quantile SameOnAllWorkers test"; - return; - } - constexpr size_t kRows = 1000, kCols = 100; RunWithSeedsAndBins( kRows, [=](int32_t seed, size_t n_bins, MetaInfo const&) { @@ -253,7 +240,7 @@ void TestSameOnAllWorkers(int32_t workers) { TEST(Quantile, SameOnAllWorkers) { auto constexpr kWorkers = 4; - RunWithInMemoryCommunicator(kWorkers, TestSameOnAllWorkers, kWorkers); + RunWithInMemoryCommunicator(kWorkers, TestSameOnAllWorkers); } } // namespace common