diff --git a/demo/dask/dask_learning_to_rank.py b/demo/dask/dask_learning_to_rank.py index 436bae71896c..3567176ba0bf 100644 --- a/demo/dask/dask_learning_to_rank.py +++ b/demo/dask/dask_learning_to_rank.py @@ -123,7 +123,11 @@ def ranking_wo_split_demo(client: Client, args: argparse.Namespace) -> None: ltr = dxgb.DaskXGBRanker(allow_group_split=False) ltr.client = client ltr = ltr.fit( - X_tr, df_tr.y, qid=df_tr.qid, eval_set=[(X_va, df_va.y)], eval_qid=[df_va.qid] + X_tr, + df_tr.y, + qid=df_tr.qid, + eval_set=[(X_tr, df_tr.y), (X_va, df_va.y)], + eval_qid=[df_tr.qid, df_va.qid], ) df_te = df_te.persist() diff --git a/python-package/xgboost/dask/__init__.py b/python-package/xgboost/dask/__init__.py index e15b837abc89..ecf3a20603ea 100644 --- a/python-package/xgboost/dask/__init__.py +++ b/python-package/xgboost/dask/__init__.py @@ -2087,6 +2087,8 @@ def check_ser( assert qe is not None and ye is not None if id(Xe) != X_id: Xe, qe, ye, we, be = no_group_split(Xe, qe, ye, we, be) + else: + Xe, qe, ye, we, be = X, qid, y, sample_weight, base_margin new_eval_set.append((Xe, ye)) new_eval_qid.append(qe) diff --git a/tests/test_distributed/test_with_dask/test_ranking.py b/tests/test_distributed/test_with_dask/test_ranking.py new file mode 100644 index 000000000000..82247786f813 --- /dev/null +++ b/tests/test_distributed/test_with_dask/test_ranking.py @@ -0,0 +1,91 @@ +"""Copyright 2019-2024, XGBoost contributors""" + +import os +from typing import Generator + +import numpy as np +import pytest +import scipy.sparse +from dask import dataframe as dd +from distributed import Client, LocalCluster, Scheduler, Worker +from distributed.utils_test import gen_cluster + +from xgboost import dask as dxgb +from xgboost import testing as tm +from xgboost.testing import dask as dtm + + +@pytest.fixture(scope="module") +def cluster() -> Generator: + n_threads = os.cpu_count() + assert n_threads is not None + with LocalCluster( + n_workers=2, threads_per_worker=n_threads // 2, dashboard_address=":0" + ) as dask_cluster: + yield dask_cluster + + +@pytest.fixture +def client(cluster: LocalCluster) -> Generator: + with Client(cluster) as dask_client: + yield dask_client + + +def test_dask_ranking(client: Client) -> None: + dpath = "demo/rank/" + mq2008 = tm.data.get_mq2008(dpath) + data = [] + for d in mq2008: + if isinstance(d, scipy.sparse.csr_matrix): + d[d == 0] = np.inf + d = d.toarray() + d[d == 0] = np.nan + d[np.isinf(d)] = 0 + data.append(dd.from_array(d, chunksize=32)) + else: + data.append(dd.from_array(d, chunksize=32)) + + ( + x_train, + y_train, + qid_train, + x_test, + y_test, + qid_test, + x_valid, + y_valid, + qid_valid, + ) = data + qid_train = qid_train.astype(np.uint32) + qid_valid = qid_valid.astype(np.uint32) + qid_test = qid_test.astype(np.uint32) + + rank = dxgb.DaskXGBRanker( + n_estimators=2500, eval_metric=["ndcg"], early_stopping_rounds=10 + ) + rank.fit( + x_train, + y_train, + qid=qid_train, + eval_set=[(x_test, y_test), (x_train, y_train)], + eval_qid=[qid_test, qid_train], + verbose=True, + ) + assert rank.n_features_in_ == 46 + assert rank.best_score > 0.98 + + +def test_no_group_split(client: Client) -> None: + X_tr, q_tr, y_tr = dtm.make_ltr(client, 4096, 128, 4, 5) + X_va, q_va, y_va = dtm.make_ltr(client, 1024, 128, 4, 5) + + ltr = dxgb.DaskXGBRanker(allow_group_split=False, n_estimators=32) + ltr.fit( + X_tr, + y_tr, + qid=q_tr, + eval_set=[(X_tr, y_tr), (X_va, y_va)], + eval_qid=[q_tr, q_va], + verbose=True, + ) + print(X_tr.shape, X_tr.columns)