Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use in-memory communicator to test quantile #8710

Merged
merged 4 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 19 additions & 35 deletions tests/cpp/common/test_quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,10 @@ void PushPage(HostSketchContainer* container, SparsePage const& page, MetaInfo c
Span<float const> hessian) {
container->PushRowPage(page, info, hessian);
}
} // anonymous namespace

template <bool use_column>
void TestDistributedQuantile(size_t rows, size_t cols) {
std::string msg {"Skipping AllReduce test"};
int32_t constexpr kWorkers = 4;
InitCommunicatorContext(msg, kWorkers);
auto world = collective::GetWorldSize();
if (world != 1) {
ASSERT_EQ(world, kWorkers);
} else {
return;
}

void DoTestDistributedQuantile(size_t rows, size_t cols) {
auto const world = collective::GetWorldSize();
std::vector<MetaInfo> infos(2);
auto& h_weights = infos.front().weights_.HostVector();
h_weights.resize(rows);
Expand Down Expand Up @@ -152,47 +142,36 @@ void TestDistributedQuantile(size_t rows, size_t cols) {
}
}

template <bool use_column>
void TestDistributedQuantile(size_t const rows, size_t const cols) {
auto constexpr kWorkers = 4;
RunWithInMemoryCommunicator(kWorkers, DoTestDistributedQuantile<use_column>, rows, cols);
}
} // anonymous namespace

TEST(Quantile, DistributedBasic) {
#if defined(__unix__)
constexpr size_t kRows = 10, kCols = 10;
TestDistributedQuantile<false>(kRows, kCols);
#endif
}

TEST(Quantile, Distributed) {
#if defined(__unix__)
constexpr size_t kRows = 4000, kCols = 200;
TestDistributedQuantile<false>(kRows, kCols);
#endif
}

TEST(Quantile, SortedDistributedBasic) {
#if defined(__unix__)
constexpr size_t kRows = 10, kCols = 10;
TestDistributedQuantile<true>(kRows, kCols);
#endif
}

TEST(Quantile, SortedDistributed) {
#if defined(__unix__)
constexpr size_t kRows = 4000, kCols = 200;
TestDistributedQuantile<true>(kRows, kCols);
#endif
}

TEST(Quantile, SameOnAllWorkers) {
#if defined(__unix__)
std::string msg{"Skipping Quantile AllreduceBasic test"};
int32_t constexpr kWorkers = 4;
InitCommunicatorContext(msg, kWorkers);
auto world = collective::GetWorldSize();
if (world != 1) {
CHECK_EQ(world, kWorkers);
} else {
LOG(WARNING) << msg;
return;
}

namespace {
void TestSameOnAllWorkers() {
auto const world = collective::GetWorldSize();
constexpr size_t kRows = 1000, kCols = 100;
RunWithSeedsAndBins(
kRows, [=](int32_t seed, size_t n_bins, MetaInfo const&) {
Expand Down Expand Up @@ -256,8 +235,13 @@ TEST(Quantile, SameOnAllWorkers) {
}
}
});
collective::Finalize();
#endif // defined(__unix__)
}
} // anonymous namespace

TEST(Quantile, SameOnAllWorkers) {
auto constexpr kWorkers = 4;
RunWithInMemoryCommunicator(kWorkers, TestSameOnAllWorkers);
}

} // namespace common
} // namespace xgboost
28 changes: 17 additions & 11 deletions tests/cpp/common/test_quantile.cu
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,9 @@ TEST(GPUQuantile, MultiMerge) {
});
}

TEST(GPUQuantile, AllReduceBasic) {
// This test is supposed to run by a python test that setups the environment.
std::string msg {"Skipping AllReduce test"};
auto n_gpus = AllVisibleGPUs();
InitCommunicatorContext(msg, n_gpus);
auto world = collective::GetWorldSize();
namespace {
void TestAllReduceBasic(int32_t n_gpus) {
auto const world = collective::GetWorldSize();
if (world != 1) {
ASSERT_EQ(world, n_gpus);
} else {
Expand Down Expand Up @@ -420,13 +417,16 @@ TEST(GPUQuantile, AllReduceBasic) {
ASSERT_NEAR(single_node_data[i].wmin, distributed_data[i].wmin, Eps);
}
});
collective::Finalize();
}
} // anonymous namespace

TEST(GPUQuantile, SameOnAllWorkers) {
std::string msg {"Skipping SameOnAllWorkers test"};
auto n_gpus = AllVisibleGPUs();
InitCommunicatorContext(msg, n_gpus);
TEST(GPUQuantile, AllReduceBasic) {
auto const n_gpus = AllVisibleGPUs();
RunWithInMemoryCommunicator(n_gpus, TestAllReduceBasic, n_gpus);
}

namespace {
void TestSameOnAllWorkers(int32_t n_gpus) {
auto world = collective::GetWorldSize();
if (world != 1) {
ASSERT_EQ(world, n_gpus);
Expand Down Expand Up @@ -490,6 +490,12 @@ TEST(GPUQuantile, SameOnAllWorkers) {
}
});
}
} // anonymous namespace

TEST(GPUQuantile, SameOnAllWorkers) {
auto const n_gpus = AllVisibleGPUs();
RunWithInMemoryCommunicator(n_gpus, TestSameOnAllWorkers, n_gpus);
}

TEST(GPUQuantile, Push) {
size_t constexpr kRows = 100;
Expand Down
25 changes: 0 additions & 25 deletions tests/cpp/common/test_quantile.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,6 @@

namespace xgboost {
namespace common {
inline void InitCommunicatorContext(std::string msg, int32_t n_workers) {
auto port = std::getenv("DMLC_TRACKER_PORT");
std::string port_str;
if (port) {
port_str = port;
} else {
LOG(WARNING) << msg << " as `DMLC_TRACKER_PORT` is not set up.";
return;
}
auto uri = std::getenv("DMLC_TRACKER_URI");
std::string uri_str;
if (uri) {
uri_str = uri;
} else {
LOG(WARNING) << msg << " as `DMLC_TRACKER_URI` is not set up.";
return;
}

Json config{JsonObject()};
config["DMLC_TRACKER_PORT"] = port_str;
config["DMLC_TRACKER_URI"] = uri_str;
config["DMLC_NUM_WORKER"] = n_workers;
collective::Init(config);
}

template <typename Fn> void RunWithSeedsAndBins(size_t rows, Fn fn) {
std::vector<int32_t> seeds(2);
SimpleLCG lcg;
Expand Down
27 changes: 24 additions & 3 deletions tests/cpp/helpers.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/**
* Copyright 2016-2023 by XGBoost contributors
*/
#ifndef XGBOOST_TESTS_CPP_HELPERS_H_
#define XGBOOST_TESTS_CPP_HELPERS_H_
#pragma once

#include <gtest/gtest.h>
#include <sys/stat.h>
Expand All @@ -16,8 +15,10 @@
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>

#include "../../src/collective/communicator-inl.h"
#include "../../src/common/common.h"
#include "../../src/data/array_interface.h"
#include "../../src/gbm/gbtree_model.h"
Expand Down Expand Up @@ -460,5 +461,25 @@ inline LearnerModelParam MakeMP(bst_feature_t n_features, float base_score, uint
return mparam;
}

template <typename Function, typename... Args>
void RunWithInMemoryCommunicator(int32_t world_size, Function&& function, Args&&... args) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < world_size; rank++) {
threads.emplace_back([&, rank]() {
Json config{JsonObject()};
config["xgboost_communicator"] = String("in-memory");
config["in_memory_world_size"] = world_size;
config["in_memory_rank"] = rank;
xgboost::collective::Init(config);

std::forward<Function>(function)(std::forward<Args>(args)...);

xgboost::collective::Finalize();
});
}
for (auto& thread : threads) {
thread.join();
}
}

} // namespace xgboost
#endif
67 changes: 30 additions & 37 deletions tests/cpp/predictor/test_cpu_predictor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,49 +90,42 @@ TEST(CpuPredictor, Basic) {
}
}

TEST(CpuPredictor, ColumnSplit) {
namespace {
void TestColumnSplitPredictBatch() {
size_t constexpr kRows = 5;
size_t constexpr kCols = 5;
auto dmat = RandomDataGenerator(kRows, kCols, 0).GenerateDMatrix();
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
auto const kSliceSize = (kCols + 1) / world_size;

auto lparam = CreateEmptyGenericParam(GPUIDX);
std::unique_ptr<Predictor> cpu_predictor =
std::unique_ptr<Predictor>(Predictor::Create("cpu_predictor", &lparam));

LearnerModelParam mparam{MakeMP(kCols, .0, 1)};

Context ctx;
ctx.UpdateAllowUnknown(Args{});
gbm::GBTreeModel model = CreateTestModel(&mparam, &ctx);

std::vector<std::thread> threads;
std::int32_t constexpr kWorldSize = 2;
size_t constexpr kSliceSize = (kCols + 1) / kWorldSize;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back([=, &dmat]() {
Json config{JsonObject()};
config["xgboost_communicator"] = String("in-memory");
config["in_memory_world_size"] = kWorldSize;
config["in_memory_rank"] = rank;
xgboost::collective::Init(config);

auto lparam = CreateEmptyGenericParam(GPUIDX);
std::unique_ptr<Predictor> cpu_predictor =
std::unique_ptr<Predictor>(Predictor::Create("cpu_predictor", &lparam));

LearnerModelParam mparam{MakeMP(kCols, .0, 1)};

Context ctx;
ctx.UpdateAllowUnknown(Args{});
gbm::GBTreeModel model = CreateTestModel(&mparam, &ctx);

// Test predict batch
PredictionCacheEntry out_predictions;
cpu_predictor->InitOutPredictions(dmat->Info(), &out_predictions.predictions, model);
auto sliced = std::unique_ptr<DMatrix>{dmat->SliceCol(rank * kSliceSize, kSliceSize)};
cpu_predictor->PredictBatch(sliced.get(), &out_predictions, model, 0);

std::vector<float>& out_predictions_h = out_predictions.predictions.HostVector();
for (size_t i = 0; i < out_predictions.predictions.Size(); i++) {
ASSERT_EQ(out_predictions_h[i], 1.5);
}
xgboost::collective::Finalize();
});
}
for (auto& thread : threads) {
thread.join();
// Test predict batch
PredictionCacheEntry out_predictions;
cpu_predictor->InitOutPredictions(dmat->Info(), &out_predictions.predictions, model);
auto sliced = std::unique_ptr<DMatrix>{dmat->SliceCol(rank * kSliceSize, kSliceSize)};
cpu_predictor->PredictBatch(sliced.get(), &out_predictions, model, 0);

std::vector<float>& out_predictions_h = out_predictions.predictions.HostVector();
for (size_t i = 0; i < out_predictions.predictions.Size(); i++) {
ASSERT_EQ(out_predictions_h[i], 1.5);
}
}
} // anonymous namespace

TEST(CpuPredictor, ColumnSplit) {
auto constexpr kWorldSize = 2;
RunWithInMemoryCommunicator(kWorldSize, TestColumnSplitPredictBatch);
}

TEST(CpuPredictor, IterationRange) {
TestIterationRange("cpu_predictor");
Expand Down
1 change: 0 additions & 1 deletion tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@
markers =
mgpu: Mark a test that requires multiple GPUs to run.
ci: Mark a test that runs only on CI.
gtest: Mark a test that requires C++ Google Test executable.
43 changes: 0 additions & 43 deletions tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,49 +486,6 @@ def test_interface_consistency(self) -> None:
for rn, drn in zip(ranker_names, dranker_names):
assert rn == drn

def run_quantile(self, name: str, local_cuda_client: Client) -> None:
exe = None
for possible_path in {
"./testxgboost",
"./build/testxgboost",
"../build/testxgboost",
"../gpu-build/testxgboost",
}:
if os.path.exists(possible_path):
exe = possible_path
assert exe, "No testxgboost executable found."
test = "--gtest_filter=GPUQuantile." + name

def runit(
worker_addr: str, rabit_args: Dict[str, Union[int, str]]
) -> subprocess.CompletedProcess:
# setup environment for running the c++ part.
env = os.environ.copy()
env['DMLC_TRACKER_PORT'] = str(rabit_args['DMLC_TRACKER_PORT'])
env["DMLC_TRACKER_URI"] = str(rabit_args["DMLC_TRACKER_URI"])
return subprocess.run([str(exe), test], env=env, stdout=subprocess.PIPE)

workers = tm.get_client_workers(local_cuda_client)
rabit_args = local_cuda_client.sync(
dxgb._get_rabit_args, len(workers), None, local_cuda_client
)
futures = local_cuda_client.map(
runit, workers, pure=False, workers=workers, rabit_args=rabit_args
)
results = local_cuda_client.gather(futures)
for ret in results:
msg = ret.stdout.decode("utf-8")
assert msg.find("1 test from GPUQuantile") != -1, msg
assert ret.returncode == 0, msg

@pytest.mark.gtest
def test_quantile_basic(self, local_cuda_client: Client) -> None:
self.run_quantile("AllReduceBasic", local_cuda_client)

@pytest.mark.gtest
def test_quantile_same_on_all_workers(self, local_cuda_client: Client) -> None:
self.run_quantile("SameOnAllWorkers", local_cuda_client)


@pytest.mark.skipif(**tm.no_cupy())
def test_with_asyncio(local_cuda_client: Client) -> None:
Expand Down
Loading