Skip to content

Commit

Permalink
get MG standardization working and tested with Dask and Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
lijinf2 committed Dec 1, 2023
1 parent 3ffee6d commit 67da440
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 163 deletions.
72 changes: 24 additions & 48 deletions cpp/src/glm/qn_mg.cu
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ void standardize_impl(const raft::handle_t& handle,
T* input_data,
const Matrix::PartDescriptor& input_desc,
bool col_major,
bool fit_intercept,
T* mean_vector,
T* stddev_vector)
{
Expand All @@ -106,50 +107,30 @@ void standardize_impl(const raft::handle_t& handle,
raft::linalg::multiplyScalar(mean_vector, mean_vector, weight, D, stream);
comm.allreduce(mean_vector, mean_vector, D, raft::comms::op_t::SUM, stream);
comm.sync_stream(stream);
auto log_mean_str = raft::arr2Str(mean_vector, D, "mean_str", handle.get_stream(), 8);
CUML_LOG_DEBUG("rank %d mean vector %s", rank, log_mean_str.c_str());

// std::vector<T> cpu_mean(D);
// raft::copy(cpu_mean.data(), mean_vector, D, stream);
// CUML_LOG_DEBUG("rank %d cpu_mean vector %0.8f,%0.8f,%0.8f,%0.8f", rank, cpu_mean[0],
// cpu_mean[1], cpu_mean[2], cpu_mean[3]);

raft::stats::vars(stddev_vector, input_data, mean_vector, D, num_rows, false, !col_major, stream);
weight = T(1) * num_rows / T(input_desc.M);
raft::linalg::multiplyScalar(stddev_vector, stddev_vector, weight, D, stream);
comm.allreduce(stddev_vector, stddev_vector, D, raft::comms::op_t::SUM, stream);
comm.sync_stream(stream);
// auto log_var_str = raft::arr2Str(stddev_vector, D, "var_str", handle.get_stream(), 8);
// CUML_LOG_DEBUG("rank %d var vector %s", rank, log_var_str.c_str());
// std::vector<T> cpu_var(D);
// raft::copy(cpu_var.data(), stddev_vector, D, stream);
// CUML_LOG_DEBUG("rank %d cpu_var vector %0.8f,%0.8f,%0.8f", rank, cpu_var[0], cpu_var[1]);

raft::linalg::sqrt(stddev_vector, stddev_vector, D, handle.get_stream());

auto log_std_str = raft::arr2Str(stddev_vector, D, "std_str", handle.get_stream(), 8);
CUML_LOG_DEBUG("rank %d std vector %s", rank, log_std_str.c_str());
// std::vector<T> cpu_std(D);
// raft::copy(cpu_std.data(), stddev_vector, D, stream);
// CUML_LOG_DEBUG("rank %d cpu_std vector %0.8f,%0.8f,%0.8f", rank, cpu_std[0], cpu_std[1],
// cpu_std[2]);

raft::stats::meanCenter(
input_data, input_data, mean_vector, D, num_rows, !col_major, !col_major, stream);
// auto log_stddata_str = raft::arr2Str(input_data, num_rows * D, "stddata", stream);
// CUML_LOG_DEBUG("rank %d stddev vector %s", rank, log_stddata_str.c_str());
if (fit_intercept) {
raft::stats::meanCenter(
input_data, input_data, mean_vector, D, num_rows, !col_major, !col_major, stream);
}

raft::matrix::matrixVectorBinaryDivSkipZero(
input_data, stddev_vector, num_rows, D, !col_major, !col_major, stream);
// log_stddata_str = raft::arr2Str(input_data, num_rows * D, "stddata", handle.get_stream());
// CUML_LOG_DEBUG("rank %d stddev vector %s", rank, log_stddata_str.c_str());
}

template <typename T>
void undo_standardize_impl(const raft::handle_t& handle,
T* input_data,
const Matrix::PartDescriptor& input_desc,
bool col_major,
bool fit_intercept,
T* mean_vector,
T* stddev_vector)
{
Expand All @@ -161,8 +142,10 @@ void undo_standardize_impl(const raft::handle_t& handle,
raft::matrix::matrixVectorBinaryMult(
input_data, stddev_vector, num_rows, D, !col_major, !col_major, stream);

raft::stats::meanAdd(
input_data, input_data, mean_vector, D, num_rows, !col_major, !col_major, stream);
if (fit_intercept) {
raft::stats::meanAdd(
input_data, input_data, mean_vector, D, num_rows, !col_major, !col_major, stream);
}
}

template <typename T>
Expand Down Expand Up @@ -217,15 +200,6 @@ void qnFit_impl(raft::handle_t& handle,
auto data_X = input_data[0];
auto data_y = labels[0];

ML::Logger::get().setLevel(pams.verbose);
CUML_LOG_DEBUG(
"rank %d gets standardization %s", input_desc.rank, standardization ? "true" : "false");
// std::cout << "rank " << input_desc.rank << " gets standardization " << standardization <<
// std::endl; int num_elements = input_desc.totalElementsOwnedBy(input_desc.rank); auto data_str =
// raft::arr2Str(input_data[0]->ptr, num_elements, "data_str", handle.get_stream()); std::cout <<
// "rank " << input_desc.rank << " gets c_str " << data_str << std::endl; CUML_LOG_DEBUG("rank %d
// gets #elements %d, and data %s", input_desc.rank, num_elements, data_str.c_str());

size_t n_samples = 0;
for (auto p : input_desc.partsToRanks) {
n_samples += p->size;
Expand All @@ -235,8 +209,13 @@ void qnFit_impl(raft::handle_t& handle,
rmm::device_uvector<T> mean_vec(input_desc.N, stream);
rmm::device_uvector<T> stddev_vec(input_desc.N, stream);
if (standardization) {
standardize_impl<T>(
handle, data_X->ptr, input_desc, X_col_major, mean_vec.data(), stddev_vec.data());
standardize_impl<T>(handle,
data_X->ptr,
input_desc,
X_col_major,
pams.fit_intercept,
mean_vec.data(),
stddev_vec.data());
}

qnFit_impl<T>(handle,
Expand All @@ -254,10 +233,6 @@ void qnFit_impl(raft::handle_t& handle,
input_desc.rank,
input_desc.uniqueRanks().size());

int n_coefs = (n_classes == 2 ? 1 : n_classes);
auto log_coef_str =
raft::arr2Str(coef, n_coefs * (input_desc.N + pams.fit_intercept), "coef", stream, 16);
CUML_LOG_DEBUG("rank %d gets coefs %s", input_desc.rank, log_coef_str.c_str());
if (standardization) {
// adapt intercepts and coefficients to avoid actual standardization in model inference

Expand Down Expand Up @@ -291,12 +266,13 @@ void qnFit_impl(raft::handle_t& handle,
Wbias.assign_gemv(handle, -1, Wweights, false, meanVec, 1, stream);
}

auto log_adaptcoef_str =
raft::arr2Str(coef, n_coefs * (input_desc.N + pams.fit_intercept), "adaptcoef", stream, 16);
CUML_LOG_DEBUG("rank %d gets adapted coefs %s", input_desc.rank, log_adaptcoef_str.c_str());
// TODO: undo standardization
undo_standardize_impl<T>(
handle, data_X->ptr, input_desc, X_col_major, mean_vec.data(), stddev_vec.data());
undo_standardize_impl<T>(handle,
data_X->ptr,
input_desc,
X_col_major,
pams.fit_intercept,
mean_vec.data(),
stddev_vec.data());
}
}

Expand Down
145 changes: 30 additions & 115 deletions python/cuml/tests/dask/test_dask_logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,66 +660,6 @@ def test_exception_one_label(fit_intercept, client):
lr.fit(X, y)


@pytest.mark.mg
@pytest.mark.parametrize("n_parts", [2])
@pytest.mark.parametrize("fit_intercept", [True])
@pytest.mark.parametrize("datatype", [np.float32])
@pytest.mark.parametrize("delayed", [False])
def test_standardization_basic(
n_parts,
fit_intercept,
datatype,
delayed,
client,
penalty="l2",
l1_ratio=None,
C=1.0,
n_classes=2,
):
X = np.array([(10, 2), (10, 3), (20, 1), (30, 1)], datatype)
y = np.array([1.0, 1.0, 0.0, 0.0], datatype)
# X[:, 0] *= 1000 # Scale up the first feature by 1000
X_train_dask, y_train_dask = _prep_training_data(client, X, y, n_parts)

# Logistic Regression without standardization
from cuml.dask.linear_model.logistic_regression import (
LogisticRegression as cumlLBFGS_dask,
)

mgon = cumlLBFGS_dask(
standardization=True,
solver="qn",
fit_intercept=fit_intercept,
penalty=penalty,
l1_ratio=l1_ratio,
C=C,
verbose=True,
)
mgon.fit(X_train_dask, y_train_dask)

from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
scaler.fit(X)

mgon_coef = mgon.coef_.to_numpy()
mgon_intercept = mgon.intercept_.to_numpy()

mgon_coef_origin = mgon_coef * scaler.scale_

d = np.dot(mgon_coef, scaler.mean_)
# print(f"mgon_coef[0][0] * scaler.mean_[0]: {mgon_coef[0][0] * scaler.mean_[0]}")
# print(f"mgon_coef[0][1] * scaler.mean_[1]: {mgon_coef[0][1] * scaler.mean_[1]}")
# print(f"scaler.mean_: {scaler.mean_}")
# print(f"d: {d}")
mgon_intercept_origin = mgon_intercept + d
print(f"mgon_coef: {mgon_coef}")
print(f"mgon_intercept: {mgon_intercept}")
print(f"mgon_coef_origin: {mgon_coef_origin}")
print(f"mgon_intercept_origin: {mgon_intercept_origin}")
print("finished")


@pytest.mark.mg
@pytest.mark.parametrize("fit_intercept", [False, True])
@pytest.mark.parametrize(
Expand Down Expand Up @@ -760,21 +700,19 @@ def test_standardization_on_normal_dataset(


@pytest.mark.mg
# @pytest.mark.parametrize("fit_intercept", [False, True])
@pytest.mark.parametrize("fit_intercept", [False])
@pytest.mark.parametrize("fit_intercept", [False, True])
@pytest.mark.parametrize(
"regularization",
[
("none", 1.0, None),
# ("l2", 2.0, None),
# ("l1", 2.0, None),
# ("elasticnet", 2.0, 0.2),
("l2", 2.0, None),
("l1", 2.0, None),
("elasticnet", 2.0, 0.2),
],
)
@pytest.mark.parametrize("datatype", [np.float32])
@pytest.mark.parametrize("delayed", [False])
# @pytest.mark.parametrize("ncol_and_nclasses", [(2, 2), (8, 8)])
@pytest.mark.parametrize("ncol_and_nclasses", [(4, 3)])
@pytest.mark.parametrize("ncol_and_nclasses", [(2, 2), (6, 4)])
def test_standardization_on_scaled_dataset(
fit_intercept, regularization, datatype, delayed, ncol_and_nclasses, client
):
Expand Down Expand Up @@ -828,7 +766,6 @@ def to_dask_data(X_train, X_test, y_train, y_test):
penalty=penalty,
l1_ratio=l1_ratio,
C=C,
verbose=6,
)
X_train_dask, X_test_dask, y_train_dask, _ = to_dask_data(
X_train, X_test, y_train, y_test
Expand All @@ -842,7 +779,9 @@ def to_dask_data(X_train, X_test, y_train, y_test):
assert array_equal(X_train_dask.compute().to_numpy(), X_train)

# run CPU with StandardScaler
scaler = StandardScaler()
# if fit_intercept is true, mean center then scale the dataset
# if fit_intercept is false, scale the dataset without mean center
scaler = StandardScaler(with_mean=fit_intercept, with_std=True)
scaler.fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)
Expand All @@ -859,6 +798,24 @@ def to_dask_data(X_train, X_test, y_train, y_test):
cpu_preds = cpu.predict(X_test_scaled)
cpu_accuracy = accuracy_score(y_test, cpu_preds)

assert len(mgon_preds) == len(cpu_preds)
assert (mgon_accuracy >= cpu_accuracy) | (
np.abs(mgon_accuracy - cpu_accuracy) < 1e-3
)

# assert equal the accuracy and the model
mgon_coef_origin = mgon.coef_.to_numpy() * scaler.scale_
if fit_intercept is True:
mgon_intercept_origin = mgon.intercept_.to_numpy() + np.dot(
mgon.coef_.to_numpy(), scaler.mean_
)
else:
mgon_intercept_origin = mgon.intercept_.to_numpy()

if sk_solver == "lbfgs":
assert array_equal(mgon_coef_origin, cpu.coef_, tolerance)
assert array_equal(mgon_intercept_origin, cpu.intercept_, tolerance)

# running MG with standardization=False
mgoff = cumlLBFGS_dask(
standardization=False,
Expand All @@ -867,7 +824,6 @@ def to_dask_data(X_train, X_test, y_train, y_test):
penalty=penalty,
l1_ratio=l1_ratio,
C=C,
verbose=6,
)
X_train_ds, X_test_ds, y_train_dask, _ = to_dask_data(
X_train_scaled, X_test_scaled, y_train, y_test
Expand All @@ -878,53 +834,12 @@ def to_dask_data(X_train, X_test, y_train, y_test):
)
mgoff_accuracy = accuracy_score(y_test, mgoff_preds)

print(f"scaler.mean_: {scaler.mean_}")
print(f"scaler.scale_: {scaler.scale_}")
print(f"mgon accuracy: {mgon_accuracy}")
print(f"mgoff accuracy: {mgoff_accuracy}")
print(f"cpu accuracy: {cpu_accuracy}")
# assert equal the model (in original scale)
mgon_coef = mgon.coef_.to_numpy()
mgon_coef_origin = mgon_coef * scaler.scale_

mgon_intercept = mgon.intercept_.to_numpy()
if fit_intercept is True:
mgon_intercept_origin = mgon_intercept + np.dot(
mgon_coef, scaler.mean_
)
else:
mgon_intercept_origin = mgon_intercept

# print(f"mgon_coef_origin: {mgon_coef_origin}")
# print(f"mgoff.coef_.to_numpy(): {mgoff.coef_.to_numpy()}")
# print("")
# print(f"mgon_intercept_origin: {mgon_intercept_origin}")
# print(f"mgoff.intercept_.to_numpy(): {mgoff.intercept_.to_numpy()}")
assert array_equal(mgon_coef_origin, mgoff.coef_.to_numpy())
assert array_equal(mgon_intercept_origin, mgoff.intercept_.to_numpy())

if sk_solver == "lbfgs":
assert array_equal(mgon_coef_origin, cpu.coef_, tolerance)
assert array_equal(mgon_intercept_origin, cpu.intercept_, tolerance)

# assert equal the accuracy
# assert equal the accuracy and the model
assert len(mgon_preds) == len(mgoff_preds)
diff = [
i for i in range(len(mgon_preds)) if mgon_preds[i] != mgoff_preds[i]
]
print(
f"len(diff): {len(diff)} / {len(mgon_preds)} = {len(diff) / 1.0 / len(mgon_preds)}"
)

idx = diff[0]
print(
f"X_test[idx]: {X_test[idx]}, mgon_preds[idx]: {mgon_preds[idx]}, X_test_scaled[idx]: {X_test_scaled[idx]}, mgoff_preds[idx]: {mgoff_preds[idx]}"
)

assert (mgon_accuracy >= mgoff_accuracy) | (
np.abs(mgon_accuracy - mgoff_accuracy) < 1e-3
)

assert (mgon_accuracy >= cpu_accuracy) | (
np.abs(mgon_accuracy - cpu_accuracy) < 1e-3
assert array_equal(mgon_coef_origin, mgoff.coef_.to_numpy(), tolerance)
assert array_equal(
mgon_intercept_origin, mgoff.intercept_.to_numpy(), tolerance
)

0 comments on commit 67da440

Please sign in to comment.