From 320a2253e44a46b1c9eeb63f2dada1edd600c806 Mon Sep 17 00:00:00 2001 From: Yuexiang XIE Date: Thu, 16 Feb 2023 20:23:37 +0800 Subject: [PATCH] Add random forest in VFL (#523) - dev for random forest - enable feature protection in rf - modify unitest --- .../autotune/baseline/fedhpo_vfl.yaml | 2 + .../core/auxiliaries/model_builder.py | 4 +- .../core/auxiliaries/worker_builder.py | 4 +- federatedscope/core/configs/cfg_fl_setting.py | 15 +- .../vertical_fl/dataloader/dataloader.py | 1 + .../vertical_fl/dataloader/utils.py | 57 ++- federatedscope/vertical_fl/dataset/abalone.py | 11 +- federatedscope/vertical_fl/dataset/adult.py | 14 +- federatedscope/vertical_fl/dataset/blog.py | 11 +- federatedscope/vertical_fl/dataset/credit.py | 11 +- federatedscope/vertical_fl/loss/binary_cls.py | 40 +- federatedscope/vertical_fl/loss/regression.py | 32 +- federatedscope/vertical_fl/loss/utils.py | 8 +- federatedscope/vertical_fl/model/Tree.py | 113 ++++- federatedscope/vertical_fl/model/__init__.py | 4 +- .../vertical_fl/model/model_builder.py | 9 +- .../vertical_fl/trainer/__init__.py | 9 +- .../feature_order_protected_trainer.py | 471 +++++++++--------- .../trainer/random_forest_trainer.py | 83 +++ federatedscope/vertical_fl/trainer/trainer.py | 137 +++-- federatedscope/vertical_fl/trainer/utils.py | 28 +- federatedscope/vertical_fl/utils.py | 4 +- .../xgb_base/baseline/gbdt_base_on_adult.yaml | 1 + .../xgb_base/baseline/rf_base_on_adult.yaml | 32 ++ .../baseline/xgb_base_on_abalone.yaml | 1 + .../xgb_base/baseline/xgb_base_on_adult.yaml | 1 + .../baseline/xgb_base_on_blogfeedback.yaml | 1 + .../xgb_base_on_givemesomecredit.yaml | 1 + .../xgb_feature_order_dp_on_adult.yaml | 1 + .../xgb_feature_order_op_boost_on_adult.yaml | 1 + .../vertical_fl/xgb_base/worker/XGBClient.py | 40 +- .../xgb_base/worker/evaluation_wrapper.py | 22 +- .../xgb_base/worker/train_wrapper.py | 25 +- ...dt.py => test_tree_based_model_for_vfl.py} | 98 +++- 34 files changed, 915 insertions(+), 377 deletions(-) create mode 100644 federatedscope/vertical_fl/trainer/random_forest_trainer.py create mode 100644 federatedscope/vertical_fl/xgb_base/baseline/rf_base_on_adult.yaml rename tests/{test_xgb_and_gbdt.py => test_tree_based_model_for_vfl.py} (83%) diff --git a/federatedscope/autotune/baseline/fedhpo_vfl.yaml b/federatedscope/autotune/baseline/fedhpo_vfl.yaml index 57f44275e..56c891864 100644 --- a/federatedscope/autotune/baseline/fedhpo_vfl.yaml +++ b/federatedscope/autotune/baseline/fedhpo_vfl.yaml @@ -35,6 +35,8 @@ vertical: key_size: 256 dims: [7, 14] algo: 'xgb' + data_size_for_debug: 1500 + feature_subsample_ratio: 1.0 eval: freq: 5 best_res_update_round_wise_key: test_loss diff --git a/federatedscope/core/auxiliaries/model_builder.py b/federatedscope/core/auxiliaries/model_builder.py index b937348ad..085017c7a 100644 --- a/federatedscope/core/auxiliaries/model_builder.py +++ b/federatedscope/core/auxiliaries/model_builder.py @@ -185,7 +185,9 @@ def get_model(model_config, local_data=None, backend='torch'): elif model_config.type.lower() in ['vmfnet', 'hmfnet']: from federatedscope.mf.model.model_builder import get_mfnet model = get_mfnet(model_config, input_shape) - elif model_config.type.lower() in ['xgb_tree', 'gbdt_tree']: + elif model_config.type.lower() in [ + 'xgb_tree', 'gbdt_tree', 'random_forest' + ]: from federatedscope.vertical_fl.model.model_builder import \ get_tree_model model = get_tree_model(model_config) diff --git a/federatedscope/core/auxiliaries/worker_builder.py b/federatedscope/core/auxiliaries/worker_builder.py index 92d2d3353..be09e51c3 100644 --- a/federatedscope/core/auxiliaries/worker_builder.py +++ b/federatedscope/core/auxiliaries/worker_builder.py @@ -60,7 +60,7 @@ def get_client_cls(cfg): if cfg.vertical.algo == 'lr': from federatedscope.vertical_fl.worker import vFLClient return vFLClient - elif cfg.vertical.algo in ['xgb', 'gbdt']: + elif cfg.vertical.algo in ['xgb', 'gbdt', 'rf']: from federatedscope.vertical_fl.xgb_base.worker import XGBClient return XGBClient else: @@ -173,7 +173,7 @@ def get_server_cls(cfg): if cfg.vertical.algo == 'lr': from federatedscope.vertical_fl.worker import vFLServer return vFLServer - elif cfg.vertical.algo in ['xgb', 'gbdt']: + elif cfg.vertical.algo in ['xgb', 'gbdt', 'rf']: from federatedscope.vertical_fl.xgb_base.worker import XGBServer return XGBServer else: diff --git a/federatedscope/core/configs/cfg_fl_setting.py b/federatedscope/core/configs/cfg_fl_setting.py index 796bb89a8..e7651b3c2 100644 --- a/federatedscope/core/configs/cfg_fl_setting.py +++ b/federatedscope/core/configs/cfg_fl_setting.py @@ -78,11 +78,16 @@ def extend_fl_setting_cfg(cfg): cfg.vertical.dims = [5, 10] # TODO: we need to explain dims cfg.vertical.encryption = 'paillier' cfg.vertical.key_size = 3072 - cfg.vertical.algo = 'lr' # ['lr', 'xgb'] + cfg.vertical.algo = 'lr' # ['lr', 'xgb', 'gbdt', 'rf'] + cfg.vertical.feature_subsample_ratio = 1.0 cfg.vertical.protect_object = '' # feature_order, TODO: add more - cfg.vertical.protect_method = '' # dp + cfg.vertical.protect_method = '' # dp, op_boost cfg.vertical.protect_args = [] # Default values for 'dp': {'bucket_num':100, 'epsilon':None} + # Default values for 'op_boost': {'algo':'global', 'lower_bound':1, + # 'upper_bound':100, 'epsilon':2} + cfg.vertical.data_size_for_debug = 0 # use a subset for debug in vfl, + # 0 indicates using the entire dataset (disable debug mode) # --------------- register corresponding check function ---------- cfg.register_cfg_check_fun(assert_fl_setting_cfg) @@ -230,5 +235,11 @@ def assert_fl_setting_cfg(cfg): f"cfg.model.type is changed to 'gbdt_tree' here") cfg.model.type = 'gbdt_tree' + if not (cfg.vertical.feature_subsample_ratio > 0 + and cfg.vertical.feature_subsample_ratio <= 1.0): + raise ValueError(f'The value of vertical.feature_subsample_ratio ' + f'must be in (0, 1.0], but got ' + f'{cfg.vertical.feature_subsample_ratio}') + register_config("fl_setting", extend_fl_setting_cfg) diff --git a/federatedscope/vertical_fl/dataloader/dataloader.py b/federatedscope/vertical_fl/dataloader/dataloader.py index 774a9c62c..5500cd097 100644 --- a/federatedscope/vertical_fl/dataloader/dataloader.py +++ b/federatedscope/vertical_fl/dataloader/dataloader.py @@ -37,6 +37,7 @@ def load_vertical_data(config=None, generate=False): feature_partition=config.vertical.dims, tr_frac=config.data.splits[0], algo=config.vertical.algo, + debug_size=config.vertical.data_size_for_debug, download=True, seed=1234, args=args) diff --git a/federatedscope/vertical_fl/dataloader/utils.py b/federatedscope/vertical_fl/dataloader/utils.py index 96788c665..7717a4067 100644 --- a/federatedscope/vertical_fl/dataloader/utils.py +++ b/federatedscope/vertical_fl/dataloader/utils.py @@ -11,7 +11,7 @@ def batch_iter(data, batch_size, shuffled=True): batch_size (int): the batch size shuffled (bool): whether to shuffle the data at the start of each epoch :returns: sample index, batch of x, batch_of y - :rtype: int, ndarray, ndarry + :rtype: int, ndarray, ndarray """ assert 'x' in data and 'y' in data @@ -28,3 +28,58 @@ def batch_iter(data, batch_size, shuffled=True): end_index = min(data_size, (batch + 1) * batch_size) sample_index = shuffled_index[start_index:end_index] yield sample_index, data_x[sample_index], data_y[sample_index] + + +class VerticalDataSampler(object): + """ + VerticalDataSampler is used to sample a subset from data + + Arguments: + data(dict): data + replace (bool): Whether the sample is with or without replacement + """ + def __init__(self, + data, + replace=False, + use_full_trainset=True, + feature_frac=1.0): + assert 'x' in data + self.data_x = data['x'] + self.data_y = data['y'] if 'y' in data else None + self.data_size = self.data_x.shape[0] + self.feature_size = self.data_x.shape[1] + self.replace = replace + self.use_full_trainset = use_full_trainset + self.selected_feature_num = max(1, + int(self.feature_size * feature_frac)) + self.selected_feature_index = None + + def sample_data(self, sample_size, index=None): + + # use the entire dataset + if self.use_full_trainset: + return range(len(self.data_x)), self.data_x, self.data_y + + if index is not None: + sampled_x = self.data_x[index] + sampled_y = self.data_y[index] if self.data_y is not None else None + else: + sample_size = min(sample_size, self.data_size) + index = np.random.choice(a=self.data_size, + size=sample_size, + replace=self.replace) + sampled_x = self.data_x[index] + sampled_y = self.data_y[index] if self.data_y is not None else None + + return index, sampled_x, sampled_y + + def sample_feature(self, x): + if self.selected_feature_num == self.feature_size: + return range(x.shape[-1]), x + else: + feature_index = np.random.choice(a=self.feature_size, + size=self.selected_feature_num, + replace=False) + self.selected_feature_index = feature_index + + return feature_index, x[:, feature_index] diff --git a/federatedscope/vertical_fl/dataset/abalone.py b/federatedscope/vertical_fl/dataset/abalone.py index aaa823b93..c71617a86 100644 --- a/federatedscope/vertical_fl/dataset/abalone.py +++ b/federatedscope/vertical_fl/dataset/abalone.py @@ -2,6 +2,7 @@ import os import os.path as osp +import numpy as np import pandas as pd from torchvision.datasets.utils import download_and_extract_archive @@ -43,7 +44,9 @@ class Abalone(object): args (dict): set Ture or False to decide whether to normalize or standardize the data or not, e.g., {'normalization': False, 'standardization': False} - algo(str): the running model, 'lr' or 'xgb' + algo(str): the running model, 'lr'/'xgb'/'gbdt'/'rf' + debug_size(int): use a subset for debug, + 0 for using entire dataset download (bool): indicator to download dataset seed: a random seed """ @@ -58,6 +61,7 @@ def __init__(self, args, algo=None, tr_frac=0.8, + debug_size=0, download=True, seed=123): self.root = root @@ -67,6 +71,7 @@ def __init__(self, self.seed = seed self.args = args self.algo = algo + self.data_size_for_debug = debug_size self.data_dict = {} self.data = {} @@ -84,6 +89,10 @@ def _get_data(self): file = osp.join(fpath, self.raw_file) data = self._read_raw(file) data = self._process(data) + if self.data_size_for_debug != 0: + subset_size = min(len(data), self.data_size_for_debug) + np.random.shuffle(data) + data = data[:subset_size] train_num = int(self.tr_frac * len(data)) self.data_dict['train'] = data[:train_num] self.data_dict['test'] = data[train_num:] diff --git a/federatedscope/vertical_fl/dataset/adult.py b/federatedscope/vertical_fl/dataset/adult.py index 653fa5bf3..8065dcd24 100644 --- a/federatedscope/vertical_fl/dataset/adult.py +++ b/federatedscope/vertical_fl/dataset/adult.py @@ -31,7 +31,9 @@ class Adult(object): args (dict): set Ture or False to decide whether to normalize or standardize the data or not, e.g., {'normalization': False, 'standardization': False} - algo(str): the running model, 'lr' or 'xgb' + algo(str): the running model, 'lr'/'xgb'/'gbdt'/'rf' + debug_size(int): use a subset for debug, + 0 for using entire dataset download (bool): indicator to download dataset seed: a random seed """ @@ -46,6 +48,7 @@ def __init__(self, args, algo=None, tr_frac=0.8, + debug_size=0, download=True, seed=123): super(Adult, self).__init__() @@ -56,6 +59,7 @@ def __init__(self, self.seed = seed self.args = args self.algo = algo + self.data_size_for_debug = debug_size self.data_dict = {} self.data = {} @@ -70,6 +74,10 @@ def _get_data(self): train_data = self._read_raw(train_file) test_data = self._read_raw(test_file) train_data, test_data = self._process(train_data, test_data) + if self.data_size_for_debug != 0: + subset_size = min(len(train_data), self.data_size_for_debug) + np.random.shuffle(train_data) + train_data = train_data[:subset_size] self._partition_data(train_data, test_data) def _read_raw(self, file_path): @@ -102,6 +110,8 @@ def _process(self, train_set, test_set): train_set = combined_set[:train_set.shape[0]] test_set = combined_set[train_set.shape[0]:] + train_set = train_set.values + test_set = test_set.values return train_set, test_set # normalization @@ -116,8 +126,6 @@ def standardization(self, data): return (data - mu) / sigma def _partition_data(self, train_set, test_set): - train_set = train_set.values - test_set = test_set.values x, y = train_set[:, :-1], train_set[:, -1] test_x, test_y = test_set[:, :-1], test_set[:, -1] diff --git a/federatedscope/vertical_fl/dataset/blog.py b/federatedscope/vertical_fl/dataset/blog.py index 601f12488..92228e37f 100644 --- a/federatedscope/vertical_fl/dataset/blog.py +++ b/federatedscope/vertical_fl/dataset/blog.py @@ -45,7 +45,9 @@ class Blog(object): args (dict): set Ture or False to decide whether to normalize or standardize the data or not, e.g., {'normalization': False, 'standardization': False} - algo(str): the running model, 'lr' or 'xgb' + algo(str): the running model, 'lr'/'xgb'/'gbdt'/'rf' + debug_size(int): use a subset for debug, + 0 for using entire dataset download (bool): indicator to download dataset seed: a random seed """ @@ -60,6 +62,7 @@ def __init__(self, args, algo=None, tr_frac=0.8, + debug_size=0, download=True, seed=123): super(Blog, self).__init__() @@ -70,6 +73,7 @@ def __init__(self, self.seed = seed self.args = args self.algo = algo + self.data_size_for_debug = debug_size self.data_dict = {} self.data = {} @@ -98,6 +102,11 @@ def _get_data(self): else: test_data = np.concatenate((test_data, f_data), axis=0) + if self.data_size_for_debug != 0: + subset_size = min(len(train_data), self.data_size_for_debug) + np.random.shuffle(train_data) + train_data = train_data[:subset_size] + self.data_dict['train'] = train_data self.data_dict['test'] = test_data diff --git a/federatedscope/vertical_fl/dataset/credit.py b/federatedscope/vertical_fl/dataset/credit.py index 76d65fb9b..6af688172 100644 --- a/federatedscope/vertical_fl/dataset/credit.py +++ b/federatedscope/vertical_fl/dataset/credit.py @@ -26,7 +26,9 @@ class Credit(object): args (dict): set Ture or False to decide whether to normalize or standardize the data or not, e.g., {'normalization': False, 'standardization': False} - algo(str): the running model, 'lr' or 'xgb' + algo(str): the running model, 'lr'/'xgb'/'gbdt'/'rf' + debug_size(int): use a subset for debug, + 0 for using entire dataset download (bool): indicator to download dataset seed: a random seed """ @@ -41,6 +43,7 @@ def __init__(self, args, algo=None, tr_frac=0.8, + debug_size=0, download=True, seed=123): super(Credit, self).__init__() @@ -51,6 +54,7 @@ def __init__(self, self.seed = seed self.args = args self.algo = algo + self.data_size_for_debug = debug_size self.data_dict = {} self.data = {} @@ -90,6 +94,11 @@ def balance_sample(sample_size, y): data = data[sample_idx] # ''' + if self.data_size_for_debug != 0: + subset_size = min(len(data), self.data_size_for_debug) + np.random.shuffle(data) + data = data[:subset_size] + train_num = int(self.tr_frac * len(data)) self.data_dict['train'] = data[:train_num] diff --git a/federatedscope/vertical_fl/loss/binary_cls.py b/federatedscope/vertical_fl/loss/binary_cls.py index da2338155..aa95e8840 100644 --- a/federatedscope/vertical_fl/loss/binary_cls.py +++ b/federatedscope/vertical_fl/loss/binary_cls.py @@ -6,25 +6,39 @@ class BinaryClsLoss(object): y = {1, 0} L = -yln(p)-(1-y)ln(1-p) """ - def __init__(self, cal_hess=True): - self.cal_hess = cal_hess + def __init__(self, model_type): + self.cal_hess = model_type in ['xgb_tree'] + self.cal_sigmoid = model_type in ['xgb_tree', 'gbdt_tree'] + self.merged_mode = 'mean' if model_type in ['random_forest'] else 'sum' + + def _sigmoid(self, y_pred): + return 1.0 / (1.0 + np.exp(-y_pred)) + + def _process_y_pred(self, y_pred): + if self.merged_mode == 'mean': + y_pred = np.mean(y_pred, axis=0) + else: + y_pred = np.sum(y_pred, axis=0) + + if self.cal_sigmoid: + y_pred = self._sigmoid(y_pred) + + return y_pred def get_metric(self, y, y_pred): - pred_prob = 1.0 / (1.0 + np.exp(-y_pred)) - pred_prob[pred_prob >= 0.5] = 1. - pred_prob[pred_prob < 0.5] = 0 - acc = np.sum(pred_prob == y) / len(y) + y_pred = self._process_y_pred(y_pred) + y_pred = (y_pred >= 0.5).astype(np.float32) + acc = np.sum(y_pred == y) / len(y) return {'acc': acc} def get_loss(self, y, y_pred): - y_pred = 1.0 / (1.0 + np.exp(-y_pred)) - res = np.mean(-y * np.log(y_pred)) + y_pred = self._process_y_pred(y_pred) + res = np.mean(-y * np.log(y_pred + 1e-7)) return res - def get_grad_and_hess(self, y, pred): - pred = np.asarray(pred) + def get_grad_and_hess(self, y, y_pred): + y_pred = self._process_y_pred(y_pred) y = np.array(y) - prob = 1.0 / (1.0 + np.exp(-pred)) - grad = prob - y - hess = prob * (1.0 - prob) if self.cal_hess else None + grad = y_pred - y + hess = y_pred * (1.0 - y_pred) if self.cal_hess else None return grad, hess diff --git a/federatedscope/vertical_fl/loss/regression.py b/federatedscope/vertical_fl/loss/regression.py index 60ff156f8..3ef6c37ac 100644 --- a/federatedscope/vertical_fl/loss/regression.py +++ b/federatedscope/vertical_fl/loss/regression.py @@ -2,16 +2,28 @@ class RegressionMAELoss(object): - def __init__(self, cal_hess=True): - self.cal_hess = cal_hess + def __init__(self, model_type): + self.cal_hess = model_type in ['xgb_tree'] + self.merged_mode = 'mean' if model_type in ['random_forest'] else 'sum' + + def _process_y_pred(self, y_pred): + if self.merged_mode == 'mean': + y_pred = np.mean(y_pred, axis=0) + else: + y_pred = np.sum(y_pred, axis=0) + + return y_pred def get_metric(self, y, y_pred): + y_pred = self._process_y_pred(y_pred) return {'mae': np.mean(np.abs(y - y_pred))} def get_loss(self, y, y_pred): + y_pred = self._process_y_pred(y_pred) return np.mean(np.abs(y - y_pred)) def get_grad_and_hess(self, y, y_pred): + y_pred = self._process_y_pred(y_pred) x = y_pred - y grad = np.sign(x) hess = np.zeros_like(x) if self.cal_hess else None @@ -19,16 +31,28 @@ def get_grad_and_hess(self, y, y_pred): class RegressionMSELoss(object): - def __init__(self, cal_hess=True): - self.cal_hess = cal_hess + def __init__(self, model_type): + self.cal_hess = model_type in ['xgb_tree'] + self.merged_mode = 'mean' if model_type in ['random_forest'] else 'sum' + + def _process_y_pred(self, y_pred): + if self.merged_mode == 'mean': + y_pred = np.mean(y_pred, axis=0) + else: + y_pred = np.sum(y_pred, axis=0) + + return y_pred def get_metric(self, y, y_pred): + y_pred = self._process_y_pred(y_pred) return {'mse': np.mean((y - y_pred)**2)} def get_loss(self, y, y_pred): + y_pred = self._process_y_pred(y_pred) return np.mean((y - y_pred)**2) def get_grad_and_hess(self, y, y_pred): + y_pred = self._process_y_pred(y_pred) x = y_pred - y grad = x hess = np.ones_like(x) if self.cal_hess else None diff --git a/federatedscope/vertical_fl/loss/utils.py b/federatedscope/vertical_fl/loss/utils.py index b1770b6d5..af95a68fd 100644 --- a/federatedscope/vertical_fl/loss/utils.py +++ b/federatedscope/vertical_fl/loss/utils.py @@ -1,10 +1,10 @@ -def get_vertical_loss(loss_type, cal_hess=True): +def get_vertical_loss(loss_type, model_type): if loss_type == 'CrossEntropyLoss': from federatedscope.vertical_fl.loss import BinaryClsLoss - return BinaryClsLoss(cal_hess=cal_hess) + return BinaryClsLoss(model_type=model_type) elif loss_type == 'RegressionMSELoss': from federatedscope.vertical_fl.loss import RegressionMSELoss - return RegressionMSELoss(cal_hess=cal_hess) + return RegressionMSELoss(model_type=model_type) elif loss_type == 'RegressionMAELoss': from federatedscope.vertical_fl.loss import RegressionMAELoss - return RegressionMAELoss(cal_hess=cal_hess) + return RegressionMAELoss(model_type=model_type) diff --git a/federatedscope/vertical_fl/model/Tree.py b/federatedscope/vertical_fl/model/Tree.py index 4b28c8fa7..9f9a18789 100644 --- a/federatedscope/vertical_fl/model/Tree.py +++ b/federatedscope/vertical_fl/model/Tree.py @@ -9,7 +9,8 @@ def __init__(self, weight=None, grad=None, hess=None, - indicator=None): + indicator=None, + label=None): self.member = None self.status = status self.feature_idx = feature_idx @@ -19,6 +20,7 @@ def __init__(self, self.grad = grad self.hess = hess self.indicator = indicator + self.label = label class Tree(object): @@ -112,6 +114,89 @@ def update_child(self, node_num, left_child, right_child): 2].indicator = self.tree[node_num].indicator * right_child +class DecisionTree(Tree): + def __init__(self, max_depth, lambda_, gamma): + super().__init__(max_depth, lambda_, gamma) + self.task_type = None # ['classification', 'regression'] + + def _gini(self, indicator, y): + total_num = np.sum(indicator) + positive_num = np.dot(indicator, y) + negative_num = total_num - positive_num + return 1 - (positive_num / total_num)**2 - (negative_num / + total_num)**2 + + def _check_same_label(self, y, indicator): + active_idx = np.nonzero(indicator)[0] + active_y = y[active_idx] + if np.sum(active_y) in [0, len(active_y)]: + return True + return False + + def cal_gini(self, split_idx, y, indicator): + if self._check_same_label(y, indicator): + # Return the maximum gini value + return 1.0 + + left_child_indicator = indicator * np.concatenate( + [np.ones(split_idx), + np.zeros(len(y) - split_idx)]) + right_child_indicator = indicator - left_child_indicator + left_gini = self._gini(left_child_indicator, y) + right_gini = self._gini(right_child_indicator, y) + total_num = np.sum(indicator) + return np.sum(left_child_indicator) / total_num * left_gini + sum( + right_child_indicator) / total_num * right_gini + + def cal_sum_of_square_mean_err(self, split_idx, y, indicator): + left_child_indicator = indicator * np.concatenate( + [np.ones(split_idx), + np.zeros(len(y) - split_idx)]) + right_child_indicator = indicator - left_child_indicator + + left_avg_value = np.dot(left_child_indicator, + y) / np.sum(left_child_indicator) + right_avg_value = np.dot(right_child_indicator, + y) / np.sum(right_child_indicator) + return np.sum((y * indicator - left_avg_value * left_child_indicator - + right_avg_value * right_child_indicator)**2) + + def cal_gain(self, split_idx, y, indicator): + if self.task_type == 'classification': + return self.cal_gini(split_idx, y, indicator) + elif self.task_type == 'regression': + return self.cal_sum_of_square_mean_err(split_idx, y, indicator) + else: + raise ValueError(f'Task type error: {self.task_type}') + + def set_task_type(self, task_type): + self.task_type = task_type + + def set_weight(self, node_num): + active_idx = np.nonzero(self.tree[node_num].indicator)[0] + active_y = self.tree[node_num].label[active_idx] + + # majority vote in classification + if self.task_type == 'classification': + vote = np.mean(active_y) + self.tree[node_num].weight = 1 if vote >= 0.5 else 0 + # mean value for regression + elif self.task_type == 'regression': + self.tree[node_num].weight = np.mean(active_y) + else: + raise ValueError + + def update_child(self, node_num, left_child, right_child): + self.tree[2 * node_num + + 1].label = self.tree[node_num].label * left_child + self.tree[2 * node_num + + 1].indicator = self.tree[node_num].indicator * left_child + self.tree[2 * node_num + + 2].label = self.tree[node_num].label * right_child + self.tree[2 * node_num + + 2].indicator = self.tree[node_num].indicator * right_child + + class MultipleXGBTrees(object): def __init__(self, max_depth, lambda_, gamma, num_of_trees): self.trees = [ @@ -140,3 +225,29 @@ def __init__(self, max_depth, lambda_, gamma, num_of_trees): def __getitem__(self, item): return self.trees[item] + + +class RandomForest(object): + def __init__(self, max_depth, lambda_, gamma, num_of_trees): + self.trees = [ + DecisionTree(max_depth=max_depth, lambda_=lambda_, gamma=gamma) + for _ in range(num_of_trees) + ] + self.num_of_trees = num_of_trees + self.lambda_ = lambda_ + self.gamma = gamma + self.max_depth = max_depth + + def __getitem__(self, item): + return self.trees[item] + + def set_task_type(self, criterion_type): + if criterion_type == 'CrossEntropyLoss': + task_type = 'classification' + elif 'regression' in criterion_type.lower(): + task_type = 'regression' + else: + raise ValueError + + for tree in self.trees: + tree.set_task_type(task_type) diff --git a/federatedscope/vertical_fl/model/__init__.py b/federatedscope/vertical_fl/model/__init__.py index 9bedec143..af3d22aba 100644 --- a/federatedscope/vertical_fl/model/__init__.py +++ b/federatedscope/vertical_fl/model/__init__.py @@ -1,4 +1,4 @@ from federatedscope.vertical_fl.model.Tree \ - import MultipleXGBTrees, MultipleGBDTTrees + import MultipleXGBTrees, MultipleGBDTTrees, RandomForest -__all__ = ['MultipleXGBTrees', 'MultipleGBDTTrees'] +__all__ = ['MultipleXGBTrees', 'MultipleGBDTTrees', 'RandomForest'] diff --git a/federatedscope/vertical_fl/model/model_builder.py b/federatedscope/vertical_fl/model/model_builder.py index 0794d7db7..0be960ce3 100644 --- a/federatedscope/vertical_fl/model/model_builder.py +++ b/federatedscope/vertical_fl/model/model_builder.py @@ -1,8 +1,8 @@ from federatedscope.vertical_fl.model \ - import MultipleXGBTrees, MultipleGBDTTrees + import MultipleXGBTrees, MultipleGBDTTrees, RandomForest -def get_tree_model(model_config): +def get_tree_model(model_config, criterion_type=None): if model_config.type.lower() == 'xgb_tree': return MultipleXGBTrees(max_depth=model_config.max_tree_depth, @@ -14,3 +14,8 @@ def get_tree_model(model_config): lambda_=model_config.lambda_, gamma=model_config.gamma, num_of_trees=model_config.num_of_trees) + elif model_config.type.lower() == 'random_forest': + return RandomForest(max_depth=model_config.max_tree_depth, + lambda_=model_config.lambda_, + gamma=model_config.gamma, + num_of_trees=model_config.num_of_trees) diff --git a/federatedscope/vertical_fl/trainer/__init__.py b/federatedscope/vertical_fl/trainer/__init__.py index 377c7c173..47ed0c512 100644 --- a/federatedscope/vertical_fl/trainer/__init__.py +++ b/federatedscope/vertical_fl/trainer/__init__.py @@ -1,3 +1,10 @@ from federatedscope.vertical_fl.trainer.trainer import VerticalTrainer from federatedscope.vertical_fl.trainer.feature_order_protected_trainer \ - import FeatureOrderProtectedTrainer + import createFeatureOrderProtectedTrainer +from federatedscope.vertical_fl.trainer.random_forest_trainer import \ + RandomForestTrainer + +__all__ = [ + 'VerticalTrainer', 'createFeatureOrderProtectedTrainer', + 'RandomForestTrainer' +] diff --git a/federatedscope/vertical_fl/trainer/feature_order_protected_trainer.py b/federatedscope/vertical_fl/trainer/feature_order_protected_trainer.py index 9d19d2112..b5f76f92a 100644 --- a/federatedscope/vertical_fl/trainer/feature_order_protected_trainer.py +++ b/federatedscope/vertical_fl/trainer/feature_order_protected_trainer.py @@ -1,236 +1,245 @@ import numpy as np -from federatedscope.vertical_fl.trainer.trainer import VerticalTrainer - - -class FeatureOrderProtectedTrainer(VerticalTrainer): - def __init__(self, model, data, device, config, monitor): - super(FeatureOrderProtectedTrainer, - self).__init__(model, data, device, config, monitor) - - assert config.vertical.protect_method != '', \ - "Please specify the adopted method for protecting feature order" - args = config.vertical.protect_args[0] if len( - config.vertical.protect_args) > 0 else {} - - if config.vertical.protect_method == 'dp': - self.bucket_num = args.get('bucket_num', 100) - self.epsilon = args.get('epsilon', None) - self.protect_funcs = self._protect_via_dp - self.split_value = None - elif config.vertical.protect_method == 'op_boost': - self.algo = args.get('algo', 'global') - self.protect_funcs = self._protect_via_op_boost - self.lower_bound = args.get('lower_bound', 1) - self.upper_bound = args.get('upper_bound', 100) - if self.algo == 'global': - self.epsilon = args.get('epsilon', 2) - elif self.algo == 'adjusting': - self.epsilon_prt = args.get('epsilon_prt', 2) - self.epsilon_ner = args.get('epsilon_ner', 2) - self.partition_num = args.get('partition_num', 10) + + +def createFeatureOrderProtectedTrainer(cls, model, data, device, config, + monitor): + class FeatureOrderProtectedTrainer(cls): + def __init__(self, model, data, device, config, monitor): + super(FeatureOrderProtectedTrainer, + self).__init__(model, data, device, config, monitor) + + assert config.vertical.protect_method != '', \ + "Please specify the method for protecting feature order" + args = config.vertical.protect_args[0] if len( + config.vertical.protect_args) > 0 else {} + + if config.vertical.protect_method == 'dp': + self.bucket_num = args.get('bucket_num', 100) + self.epsilon = args.get('epsilon', None) + self.protect_funcs = self._protect_via_dp + self.split_value = None + elif config.vertical.protect_method == 'op_boost': + self.algo = args.get('algo', 'global') + self.protect_funcs = self._protect_via_op_boost + self.lower_bound = args.get('lower_bound', 1) + self.upper_bound = args.get('upper_bound', 100) + if self.algo == 'global': + self.epsilon = args.get('epsilon', 2) + elif self.algo == 'adjusting': + self.epsilon_prt = args.get('epsilon_prt', 2) + self.epsilon_ner = args.get('epsilon_ner', 2) + self.partition_num = args.get('partition_num', 10) + else: + raise ValueError else: - raise ValueError - else: - raise ValueError(f"The method {args['method']} is not provided") - - def get_feature_value(self, feature_idx, value_idx): - if not hasattr(self, 'split_value') or self.split_value is None: - return super().get_feature_value(feature_idx=feature_idx, - value_idx=value_idx) - - return self.split_value[feature_idx][value_idx] - - def _bucketize(self, feature_order, bucket_size, bucket_num): - bucketized_feature_order = list() - for bucket_idx in range(bucket_num): - start = bucket_idx * bucket_size - end = min((bucket_idx + 1) * bucket_size, len(feature_order)) - bucketized_feature_order.append(feature_order[start:end]) - return bucketized_feature_order - - def _processed_data(self, data): - min_value = np.min(data, axis=0) - max_value = np.max(data, axis=0) - # To avoid data_max[i] == data_min[i], - for i in range(data.shape[1]): - if max_value[i] == min_value[i]: - max_value[i] += 1 - return np.round(self.lower_bound + (data - min_value) / - (max_value - min_value) * - (self.upper_bound - self.lower_bound)) - - def _global_mapping_fun(self, x, epsilon, lower_bound, upper_bound): - probs = list() - denominator = np.sum( - np.exp(-np.abs(x - np.array(range(lower_bound, upper_bound + 1))) * - epsilon / 2)) - for k in range(lower_bound, upper_bound + 1): - probs.append(np.exp(-np.abs(x - k) * epsilon / 2) / denominator) - res = np.random.choice(list(range(lower_bound, upper_bound + 1)), - p=probs) - - return res - - def _adjusting_mapping_fun(self, x, partition_edges): - for part_idx in range(self.partition_num): - if partition_edges[part_idx] < x and partition_edges[part_idx + - 1] >= x: - selected_part = self._global_mapping_fun( - part_idx, - epsilon=self.epsilon_prt, - lower_bound=0, - upper_bound=self.partition_num - 1) - res = self._global_mapping_fun( - x, - epsilon=self.epsilon_ner, - lower_bound=partition_edges[selected_part] + 1, - upper_bound=partition_edges[selected_part + 1]) - - return res - - def _op_boost_global(self, data): - - processed_data = self._processed_data(data=data) - mapped_data = np.vectorize(self._global_mapping_fun)( - processed_data, - epsilon=self.epsilon, - lower_bound=self.lower_bound, - upper_bound=self.upper_bound) - - return mapped_data - - def _op_boost_adjusting(self, data): - - processed_data = self._processed_data(data=data) - quantiles = np.linspace(0, 100, self.partition_num + 1) - partition_edges = np.round( - np.asarray( - np.percentile( - list(range(self.lower_bound - 1, self.upper_bound + 1)), - quantiles))) - partition_edges = [int(x) for x in partition_edges] - mapped_data = np.vectorize(self._adjusting_mapping_fun, - signature='(),(n)->()')( - processed_data, - partition_edges=partition_edges) - - return mapped_data - - def _protect_via_op_boost(self, raw_feature_order, data): - """ - Add random noises to feature order for privacy protection. - For more details, please see + raise ValueError( + f"The method {args['method']} is not provided") + + def get_feature_value(self, feature_idx, value_idx): + if not hasattr(self, 'split_value') or self.split_value is None: + return super().get_feature_value(feature_idx=feature_idx, + value_idx=value_idx) + + return self.split_value[feature_idx][value_idx] + + def _bucketize(self, feature_order, bucket_size, bucket_num): + bucketized_feature_order = list() + for bucket_idx in range(bucket_num): + start = bucket_idx * bucket_size + end = min((bucket_idx + 1) * bucket_size, len(feature_order)) + bucketized_feature_order.append(feature_order[start:end]) + return bucketized_feature_order + + def _processed_data(self, data): + min_value = np.min(data, axis=0) + max_value = np.max(data, axis=0) + # To avoid data_max[i] == data_min[i], + for i in range(data.shape[1]): + if max_value[i] == min_value[i]: + max_value[i] += 1 + return np.round(self.lower_bound + (data - min_value) / + (max_value - min_value) * + (self.upper_bound - self.lower_bound)) + + def _global_mapping_fun(self, x, epsilon, lower_bound, upper_bound): + probs = list() + denominator = np.sum( + np.exp( + -np.abs(x - np.array(range(lower_bound, upper_bound + 1))) + * epsilon / 2)) + for k in range(lower_bound, upper_bound + 1): + probs.append( + np.exp(-np.abs(x - k) * epsilon / 2) / denominator) + res = np.random.choice(list(range(lower_bound, upper_bound + 1)), + p=probs) + + return res + + def _adjusting_mapping_fun(self, x, partition_edges): + for part_idx in range(self.partition_num): + if partition_edges[part_idx] < x and partition_edges[part_idx + + 1] >= x: + selected_part = self._global_mapping_fun( + part_idx, + epsilon=self.epsilon_prt, + lower_bound=0, + upper_bound=self.partition_num - 1) + res = self._global_mapping_fun( + x, + epsilon=self.epsilon_ner, + lower_bound=partition_edges[selected_part] + 1, + upper_bound=partition_edges[selected_part + 1]) + + return res + + def _op_boost_global(self, data): + + processed_data = self._processed_data(data=data) + mapped_data = np.vectorize(self._global_mapping_fun)( + processed_data, + epsilon=self.epsilon, + lower_bound=self.lower_bound, + upper_bound=self.upper_bound) + + return mapped_data + + def _op_boost_adjusting(self, data): + + processed_data = self._processed_data(data=data) + quantiles = np.linspace(0, 100, self.partition_num + 1) + partition_edges = np.round( + np.asarray( + np.percentile( + list(range(self.lower_bound - 1, + self.upper_bound + 1)), quantiles))) + partition_edges = [int(x) for x in partition_edges] + mapped_data = np.vectorize(self._adjusting_mapping_fun, + signature='(),(n)->()')( + processed_data, + partition_edges=partition_edges) + + return mapped_data + + def _protect_via_op_boost(self, raw_feature_order, data): + """ + Add random noises to feature order for privacy protection. + For more details, please see OpBoost: A Vertical Federated Tree Boosting Framework Based on Order-Preserving Desensitization.pdf (https://arxiv.org/pdf/2210.01318.pdf) - """ - if self.algo == 'global': - mapped_data = self._op_boost_global(data) - elif self.algo == 'adjusting': - mapped_data = self._op_boost_adjusting(data) - else: - mapped_data = None - assert mapped_data is not None - - # Get feature order based on mapped data - num_of_feature = mapped_data.shape[1] - protected_feature_order = [0] * num_of_feature - for i in range(num_of_feature): - protected_feature_order[i] = mapped_data[:, i].argsort() - - return { - 'raw_feature_order': raw_feature_order, - 'feature_order': protected_feature_order, - } - - def _protect_via_dp(self, raw_feature_order, data): - """ - Bucketize and add dp noise to feature order for privacy protection. - For more details, please refer to - FederBoost: Private Federated Learning for GBDT - (https://arxiv.org/pdf/2011.02796.pdf) - """ - protected_feature_order = list() - bucket_size = int( - np.ceil(self.cfg.dataloader.batch_size / self.bucket_num)) - if self.epsilon is None: - prob_for_preserving = 1.0 - else: - _tmp = np.power(np.e, self.epsilon) - prob_for_preserving = _tmp / (_tmp + self.bucket_num - 1) - prob_for_moving = (1.0 - prob_for_preserving) / (self.bucket_num - 1) - split_position = [] - self.split_value = [] - - for feature_idx in range(len(raw_feature_order)): - bucketized_feature_order = self._bucketize( - raw_feature_order[feature_idx], bucket_size, self.bucket_num) - noisy_bucketizd_feature_order = [[] - for _ in range(self.bucket_num)] - - # Add noise to bucketized feature order - for bucket_idx in range(self.bucket_num): - probs = np.ones(self.bucket_num) * prob_for_moving - probs[bucket_idx] = prob_for_preserving - for each in bucketized_feature_order[bucket_idx]: - selected_bucket_idx = np.random.choice(list( - range(self.bucket_num)), - p=probs) - noisy_bucketizd_feature_order[selected_bucket_idx].append( - each) - - # Save split positions (instance number within buckets) - # We exclude the endpoints to avoid empty sub-trees - _split_position = list() - _split_value = dict() - accumu_num = 0 - for bucket_idx, each_bucket in enumerate( - noisy_bucketizd_feature_order): - instance_num = len(each_bucket) - # Skip the empty bucket - if instance_num != 0: - # Skip the endpoints - if bucket_idx != self.bucket_num - 1: - _split_position.append(accumu_num + instance_num) - - # Save split values: average of min value of (j-1)-th - # bucket and max value of j-th bucket - max_value = data[bucketized_feature_order[bucket_idx] - [-1]][feature_idx] - min_value = data[bucketized_feature_order[bucket_idx] - [0]][feature_idx] - if accumu_num == 0: - _split_value[accumu_num + - instance_num] = max_value / 2.0 - elif bucket_idx == self.bucket_num - 1: - _split_value[accumu_num] += min_value / 2.0 - else: - _split_value[accumu_num] += min_value / 2.0 - _split_value[accumu_num + - instance_num] = max_value / 2.0 - - accumu_num += instance_num - - split_position.append(_split_position) - self.split_value.append(_split_value) - - [np.random.shuffle(x) for x in noisy_bucketizd_feature_order] - noisy_bucketizd_feature_order = np.concatenate( - noisy_bucketizd_feature_order) - protected_feature_order.append(noisy_bucketizd_feature_order) - - extra_info = {'split_position': split_position} - - return { - 'raw_feature_order': raw_feature_order, - 'feature_order': protected_feature_order, - 'extra_info': extra_info - } - - # TODO: more flexible for client to choose whether to protect or not - def _get_feature_order_info(self, data): - num_of_feature = data.shape[1] - feature_order = [0] * num_of_feature - for i in range(num_of_feature): - feature_order[i] = data[:, i].argsort() - return self.protect_funcs(feature_order, data) + """ + if self.algo == 'global': + mapped_data = self._op_boost_global(data) + elif self.algo == 'adjusting': + mapped_data = self._op_boost_adjusting(data) + else: + mapped_data = None + assert mapped_data is not None + + # Get feature order based on mapped data + num_of_feature = mapped_data.shape[1] + protected_feature_order = [0] * num_of_feature + for i in range(num_of_feature): + protected_feature_order[i] = mapped_data[:, i].argsort() + + return { + 'raw_feature_order': raw_feature_order, + 'feature_order': protected_feature_order, + } + + def _protect_via_dp(self, raw_feature_order, data): + """ + Bucketize and add dp noise to feature order for protection. + For more details, please refer to + FederBoost: Private Federated Learning for GBDT + (https://arxiv.org/pdf/2011.02796.pdf) + """ + protected_feature_order = list() + bucket_size = int( + np.ceil(self.cfg.dataloader.batch_size / self.bucket_num)) + if self.epsilon is None: + prob_for_preserving = 1.0 + else: + _tmp = np.power(np.e, self.epsilon) + prob_for_preserving = _tmp / (_tmp + self.bucket_num - 1) + prob_for_moving = (1.0 - prob_for_preserving) / (self.bucket_num - + 1) + split_position = [] + self.split_value = [] + + for feature_idx in range(len(raw_feature_order)): + bucketized_feature_order = self._bucketize( + raw_feature_order[feature_idx], bucket_size, + self.bucket_num) + noisy_bucketizd_feature_order = [ + [] for _ in range(self.bucket_num) + ] + + # Add noise to bucketized feature order + for bucket_idx in range(self.bucket_num): + probs = np.ones(self.bucket_num) * prob_for_moving + probs[bucket_idx] = prob_for_preserving + for each in bucketized_feature_order[bucket_idx]: + selected_bucket_idx = np.random.choice(list( + range(self.bucket_num)), + p=probs) + noisy_bucketizd_feature_order[ + selected_bucket_idx].append(each) + + # Save split positions (instance number within buckets) + # We exclude the endpoints to avoid empty sub-trees + _split_position = list() + _split_value = dict() + accumu_num = 0 + for bucket_idx, each_bucket in enumerate( + noisy_bucketizd_feature_order): + instance_num = len(each_bucket) + # Skip the empty bucket + if instance_num != 0: + # Skip the endpoints + if bucket_idx != self.bucket_num - 1: + _split_position.append(accumu_num + instance_num) + + # Save split values: average of min value of (j-1)-th + # bucket and max value of j-th bucket + max_value = data[bucketized_feature_order[bucket_idx] + [-1]][feature_idx] + min_value = data[bucketized_feature_order[bucket_idx] + [0]][feature_idx] + if accumu_num == 0: + _split_value[accumu_num + + instance_num] = max_value / 2.0 + elif bucket_idx == self.bucket_num - 1: + _split_value[accumu_num] += min_value / 2.0 + else: + _split_value[accumu_num] += min_value / 2.0 + _split_value[accumu_num + + instance_num] = max_value / 2.0 + + accumu_num += instance_num + + split_position.append(_split_position) + self.split_value.append(_split_value) + + [np.random.shuffle(x) for x in noisy_bucketizd_feature_order] + noisy_bucketizd_feature_order = np.concatenate( + noisy_bucketizd_feature_order) + protected_feature_order.append(noisy_bucketizd_feature_order) + + extra_info = {'split_position': split_position} + + return { + 'raw_feature_order': raw_feature_order, + 'feature_order': protected_feature_order, + 'extra_info': extra_info + } + + # TODO: more flexible for client to choose whether to protect or not + def _get_feature_order_info(self, data): + num_of_feature = data.shape[1] + feature_order = [0] * num_of_feature + for i in range(num_of_feature): + feature_order[i] = data[:, i].argsort() + return self.protect_funcs(feature_order, data) + + return FeatureOrderProtectedTrainer(model, data, device, config, monitor) diff --git a/federatedscope/vertical_fl/trainer/random_forest_trainer.py b/federatedscope/vertical_fl/trainer/random_forest_trainer.py new file mode 100644 index 000000000..465792a0d --- /dev/null +++ b/federatedscope/vertical_fl/trainer/random_forest_trainer.py @@ -0,0 +1,83 @@ +import numpy as np + +from federatedscope.vertical_fl.trainer import VerticalTrainer +from federatedscope.vertical_fl.dataloader.utils import VerticalDataSampler +from federatedscope.vertical_fl.loss.utils import get_vertical_loss + + +class RandomForestTrainer(VerticalTrainer): + def __init__(self, model, data, device, config, monitor): + super(RandomForestTrainer, self).__init__(model, data, device, config, + monitor) + + def _init_for_train(self): + self.eta = 1.0 + self.model.set_task_type(self.cfg.criterion.type) + self.dataloader = VerticalDataSampler( + data=self.data['train'], + replace=True, + use_full_trainset=False, + feature_frac=self.cfg.vertical.feature_subsample_ratio) + self.criterion = get_vertical_loss(loss_type=self.cfg.criterion.type, + model_type=self.cfg.model.type) + + def _compute_for_root(self, tree_num): + node_num = 0 + self.model[tree_num][node_num].label = self.batch_y + self.model[tree_num][node_num].indicator = np.ones(len(self.batch_y)) + return self._compute_for_node(tree_num, node_num=node_num) + + def _get_ordered_indicator_and_label(self, tree_num, node_num, + feature_idx): + order = self.merged_feature_order[feature_idx] + ordered_indicator = self.model[tree_num][node_num].indicator[order] + ordered_label = self.model[tree_num][node_num].label[order] + return ordered_indicator, ordered_label + + def _get_best_gain(self, tree_num, node_num): + if self.cfg.criterion.type == 'CrossEntropyLoss': + default_gain = 1 + elif 'Regression' in self.cfg.criterion.type: + default_gain = float('inf') + else: + raise ValueError + + split_ref = {'feature_idx': None, 'value_idx': None} + best_gain = default_gain + feature_num = len(self.merged_feature_order) + split_position = None + if self.extra_info is not None: + split_position = self.extra_info.get('split_position', None) + + activate_idx = [ + np.nonzero(self.model[tree_num][node_num].indicator[order])[0] + for order in self.merged_feature_order + ] + activate_idx = np.asarray(activate_idx) + if split_position is None: + # The left/right sub-tree cannot be empty + split_position = activate_idx[:, 1:] + else: + active_split_position = list() + for idx, each_split_position in enumerate(split_position): + active_split_position.append([ + x for x in each_split_position + if x in activate_idx[idx, 1:] + ]) + split_position = active_split_position + + for feature_idx in range(feature_num): + if len(split_position[feature_idx]) == 0: + continue + ordered_indicator, ordered_label = \ + self._get_ordered_indicator_and_label( + tree_num, node_num, feature_idx) + for value_idx in split_position[feature_idx]: + gain = self.model[tree_num].cal_gain(value_idx, ordered_label, + ordered_indicator) + if gain < best_gain: + best_gain = gain + split_ref['feature_idx'] = feature_idx + split_ref['value_idx'] = value_idx + + return best_gain < default_gain, split_ref diff --git a/federatedscope/vertical_fl/trainer/trainer.py b/federatedscope/vertical_fl/trainer/trainer.py index 51911d655..34a1d6ae0 100644 --- a/federatedscope/vertical_fl/trainer/trainer.py +++ b/federatedscope/vertical_fl/trainer/trainer.py @@ -2,7 +2,7 @@ import logging from collections import deque -from federatedscope.vertical_fl.dataloader.utils import batch_iter +from federatedscope.vertical_fl.dataloader.utils import VerticalDataSampler from federatedscope.vertical_fl.loss.utils import get_vertical_loss logger = logging.getLogger(__name__) @@ -16,25 +16,64 @@ def __init__(self, model, data, device, config, monitor): self.cfg = config self.monitor = monitor - self.eta = config.train.optimizer.eta - self.merged_feature_order = None self.client_feature_order = None + self.complete_feature_order_info = None + self.client_feature_num = list() self.extra_info = None self.batch_x = None self.batch_y = None self.batch_y_hat = None - self.batch_z = None - - def prepare_for_train(self, index=None): - self.dataloader = batch_iter(self.data['train'], - self.cfg.dataloader.batch_size, - shuffled=True) - self.criterion = get_vertical_loss( - self.cfg.criterion.type, - cal_hess=(self.cfg.model.type == 'xgb_tree')) - batch_index, self.batch_x, self.batch_y = self._fetch_train_data(index) - feature_order_info = self._get_feature_order_info(self.batch_x) + self.batch_z = 0 + + def _init_for_train(self): + self.eta = self.cfg.train.optimizer.eta + self.dataloader = VerticalDataSampler( + data=self.data['train'], + use_full_trainset=True, + feature_frac=self.cfg.vertical.feature_subsample_ratio) + self.criterion = get_vertical_loss(loss_type=self.cfg.criterion.type, + model_type=self.cfg.model.type) + + def prepare_for_train(self): + if self.dataloader.use_full_trainset: + complete_feature_order_info = self._get_feature_order_info( + self.data['train']['x']) + self.complete_feature_order_info = complete_feature_order_info + else: + self.complete_feature_order_info = None + + def fetch_train_data(self, index=None): + # Clear the variables for last training round + self.client_feature_num.clear() + + # Fetch new data + batch_index, self.batch_x, self.batch_y = self.dataloader.sample_data( + sample_size=self.cfg.dataloader.batch_size, index=index) + feature_index, self.batch_x = self.dataloader.sample_feature( + self.batch_x) + + # If the complete trainset is used, we only need to get the slices + # from the complete feature order info according to the feature index, + # rather than re-ordering the instance + if self.dataloader.use_full_trainset: + assert self.complete_feature_order_info is not None + feature_order_info = dict() + for key in self.complete_feature_order_info: + if isinstance(self.complete_feature_order_info[key], + list) or isinstance( + self.complete_feature_order_info[key], + np.ndarray): + feature_order_info[key] = [ + self.complete_feature_order_info[key][_index] + for _index in feature_index + ] + else: + feature_order_info[key] = self.complete_feature_order_info[ + key] + else: + feature_order_info = self._get_feature_order_info(self.batch_x) + if 'raw_feature_order' in feature_order_info: # When applying protect method, the raw (real) feature order might # be different from the shared feature order @@ -42,17 +81,13 @@ def prepare_for_train(self, index=None): feature_order_info.pop('raw_feature_order') else: self.client_feature_order = feature_order_info['feature_order'] - if index is None: - self.batch_y_hat = np.random.uniform(low=0.0, - high=1.0, - size=len(self.batch_y)) - self.batch_z = 0 + return batch_index, feature_order_info def train(self, feature_order_info=None, tree_num=0, node_num=None): # Start to build a tree if node_num is None: - if tree_num == 0 and feature_order_info is not None: + if feature_order_info is not None: self.merged_feature_order, self.extra_info = \ self._parse_feature_order(feature_order_info) return self._compute_for_root(tree_num=tree_num) @@ -60,6 +95,12 @@ def train(self, feature_order_info=None, tree_num=0, node_num=None): else: return self._compute_for_node(tree_num, node_num) + def get_abs_feature_idx(self, rel_feature_idx): + if self.dataloader.selected_feature_index is None: + return rel_feature_idx + else: + return self.dataloader.selected_feature_index[rel_feature_idx] + def get_feature_value(self, feature_idx, value_idx): assert self.batch_x is not None @@ -69,17 +110,15 @@ def get_feature_value(self, feature_idx, value_idx): def _predict(self, tree_num): self._compute_weight(tree_num, node_num=0) - def _fetch_train_data(self, index=None): - if index is None: - return next(self.dataloader) - else: - return index, self.data['train']['x'][index], None - def _parse_feature_order(self, feature_order_info): client_ids = list(feature_order_info.keys()) client_ids = sorted(client_ids) - merged_feature_order = np.concatenate( - [feature_order_info[idx]['feature_order'] for idx in client_ids]) + merged_feature_order = list() + for each_client in client_ids: + _feature_order = feature_order_info[each_client]['feature_order'] + merged_feature_order.append(_feature_order) + self.client_feature_num.append(len(_feature_order)) + merged_feature_order = np.concatenate(merged_feature_order) # TODO: different extra_info for different clients extra_info = feature_order_info[client_ids[0]].get('extra_info', None) @@ -116,17 +155,28 @@ def _get_best_gain(self, tree_num, node_num): best_gain = 0 split_ref = {'feature_idx': None, 'value_idx': None} - instance_num = self.batch_x.shape[0] feature_num = len(self.merged_feature_order) + split_position = None if self.extra_info is not None: - split_position = self.extra_info.get( - 'split_position', - [range(instance_num) for _ in range(feature_num)]) - else: + split_position = self.extra_info.get('split_position', None) + + activate_idx = [ + np.nonzero(self.model[tree_num][node_num].indicator[order])[0] + for order in self.merged_feature_order + ] + activate_idx = np.asarray(activate_idx) + if split_position is None: # The left/right sub-tree cannot be empty - split_position = [ - range(1, instance_num) for _ in range(feature_num) - ] + split_position = activate_idx[:, 1:] + else: + active_split_position = list() + for idx, each_split_position in enumerate(split_position): + active_split_position.append([ + x for x in each_split_position + if x in activate_idx[idx, 1:] + ]) + split_position = active_split_position + for feature_idx in range(feature_num): ordered_g, ordered_h = self._get_ordered_gh( tree_num, node_num, feature_idx) @@ -139,9 +189,14 @@ def _get_best_gain(self, tree_num, node_num): split_ref['feature_idx'] = feature_idx split_ref['value_idx'] = value_idx - return best_gain, split_ref + return best_gain > 0, split_ref def _compute_for_root(self, tree_num): + if self.batch_y_hat is None: + # Assign a random predictions when tree_num = 0 + self.batch_y_hat = [ + np.random.uniform(low=0.0, high=1.0, size=len(self.batch_y)) + ] g, h = self.criterion.get_grad_and_hess(self.batch_y, self.batch_y_hat) node_num = 0 self.model[tree_num][node_num].grad = g @@ -164,8 +219,8 @@ def _compute_for_node(self, tree_num, node_num): return self._compute_for_node(tree_num, node_num + 1) # Calculate best gain else: - best_gain, split_ref = self._get_best_gain(tree_num, node_num) - if best_gain > 0: + improved_flag, split_ref = self._get_best_gain(tree_num, node_num) + if improved_flag: split_feature = self.merged_feature_order[ split_ref['feature_idx']] left_child = np.zeros(self.batch_x.shape[0]) @@ -185,9 +240,9 @@ def _compute_for_node(self, tree_num, node_num): def _compute_weight(self, tree_num, node_num): if node_num >= 2**self.model.max_depth - 1: if tree_num == 0: - self.batch_y_hat = self.batch_z + self.batch_y_hat = [self.batch_z] else: - self.batch_y_hat += self.batch_z + self.batch_y_hat.append(self.batch_z) self.batch_z = 0 else: diff --git a/federatedscope/vertical_fl/trainer/utils.py b/federatedscope/vertical_fl/trainer/utils.py index c7c75939f..1dce8b6b8 100644 --- a/federatedscope/vertical_fl/trainer/utils.py +++ b/federatedscope/vertical_fl/trainer/utils.py @@ -1,21 +1,27 @@ from federatedscope.vertical_fl.trainer import VerticalTrainer, \ - FeatureOrderProtectedTrainer + RandomForestTrainer, createFeatureOrderProtectedTrainer def get_vertical_trainer(config, model, data, device, monitor): + if config.model.type.lower() == 'random_forest': + trainer_cls = RandomForestTrainer + else: + trainer_cls = VerticalTrainer + protect_object = config.vertical.protect_object if not protect_object or protect_object == '': - return VerticalTrainer(model=model, - data=data, - device=device, - config=config, - monitor=monitor) + return trainer_cls(model=model, + data=data, + device=device, + config=config, + monitor=monitor) elif protect_object == 'feature_order': - return FeatureOrderProtectedTrainer(model=model, - data=data, - device=device, - config=config, - monitor=monitor) + return createFeatureOrderProtectedTrainer(cls=trainer_cls, + model=model, + data=data, + device=device, + config=config, + monitor=monitor) else: raise ValueError diff --git a/federatedscope/vertical_fl/utils.py b/federatedscope/vertical_fl/utils.py index b41a3c323..d3e5d6947 100644 --- a/federatedscope/vertical_fl/utils.py +++ b/federatedscope/vertical_fl/utils.py @@ -4,7 +4,7 @@ def wrap_vertical_server(server, config): - if config.vertical.algo in ['xgb', 'gbdt']: + if config.vertical.algo in ['xgb', 'gbdt', 'rf']: server = wrap_server_for_train(server) server = wrap_server_for_evaluation(server) @@ -12,7 +12,7 @@ def wrap_vertical_server(server, config): def wrap_vertical_client(client, config): - if config.vertical.algo in ['xgb', 'gbdt']: + if config.vertical.algo in ['xgb', 'gbdt', 'rf']: client = wrap_client_for_train(client) client = wrap_client_for_evaluation(client) diff --git a/federatedscope/vertical_fl/xgb_base/baseline/gbdt_base_on_adult.yaml b/federatedscope/vertical_fl/xgb_base/baseline/gbdt_base_on_adult.yaml index a2104e5f8..987c05f69 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/gbdt_base_on_adult.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/gbdt_base_on_adult.yaml @@ -29,6 +29,7 @@ vertical: use: True dims: [7, 14] algo: 'gbdt' + data_size_for_debug: 2000 eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/rf_base_on_adult.yaml b/federatedscope/vertical_fl/xgb_base/baseline/rf_base_on_adult.yaml new file mode 100644 index 000000000..e8d176fa1 --- /dev/null +++ b/federatedscope/vertical_fl/xgb_base/baseline/rf_base_on_adult.yaml @@ -0,0 +1,32 @@ +use_gpu: False +device: 0 +backend: torch +federate: + mode: standalone + client_num: 2 +model: + type: random_forest + lambda_: 0.1 + gamma: 0 + num_of_trees: 10 + max_tree_depth: 5 +data: + root: data/ + type: adult + splits: [1.0, 0.0] +dataloader: + type: raw + batch_size: 2000 +criterion: + type: CrossEntropyLoss +trainer: + type: verticaltrainer +vertical: + use: True + dims: [7, 14] + algo: 'rf' + data_size_for_debug: 1500 + feature_subsample_ratio: 0.8 +eval: + freq: 3 + best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml index 479dae7d7..ce18b6e51 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml @@ -29,6 +29,7 @@ vertical: use: True dims: [4, 8] algo: 'xgb' + data_size_for_debug: 2000 eval: freq: 5 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml index 023679d20..c9ab0a345 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml @@ -29,6 +29,7 @@ vertical: use: True dims: [7, 14] algo: 'xgb' + data_size_for_debug: 2000 eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml index 1c44aeda5..d8b43b042 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml @@ -29,6 +29,7 @@ vertical: use: True dims: [10, 20] algo: 'xgb' + data_size_for_debug: 2000 eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml index 7e50ee12f..c325125b5 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml @@ -29,6 +29,7 @@ vertical: use: True dims: [5, 10] algo: 'xgb' + data_size_for_debug: 2000 eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_dp_on_adult.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_dp_on_adult.yaml index 9843c56b0..7cd3a5c00 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_dp_on_adult.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_dp_on_adult.yaml @@ -32,6 +32,7 @@ vertical: protect_object: 'feature_order' protect_method: 'dp' protect_args: [{'bucket_num': 100, 'epsilon':10}] + data_size_for_debug: 2000 eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_op_boost_on_adult.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_op_boost_on_adult.yaml index 80b357165..9962d3810 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_op_boost_on_adult.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_op_boost_on_adult.yaml @@ -32,6 +32,7 @@ vertical: protect_object: 'feature_order' protect_method: 'op_boost' protect_args: [{'algo': 'global'}] + data_size_for_debug: 2000 eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py b/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py index a4d64d5df..72e467507 100644 --- a/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py +++ b/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py @@ -61,6 +61,7 @@ def eval(self, tree_num): def _init_data_related_var(self): + self.trainer._init_for_train() self.test_x = None self.test_y = None @@ -71,25 +72,17 @@ def _init_data_related_var(self): def callback_func_for_model_para(self, message: Message): self.state = message.state + self.trainer.prepare_for_train() if self.own_label: - batch_index, feature_order_info = self.trainer.prepare_for_train() - self.feature_order = feature_order_info['feature_order'] - self.msg_buffer[self.ID] = feature_order_info - receiver = [ - each for each in list(self.comm_manager.neighbors.keys()) - if each not in [self.ID, self.server_id] - ] - self.comm_manager.send( - Message(msg_type='data_sample', - sender=self.ID, - state=self.state, - receiver=receiver, - content=batch_index)) + batch_index, feature_order_info = self.trainer.fetch_train_data() + self.start_a_new_training_round(batch_index, + feature_order_info, + tree_num=0) # other clients receive the data-sample information def callback_func_for_data_sample(self, message: Message): batch_index, sender = message.content, message.sender - _, feature_order_info = self.trainer.prepare_for_train( + _, feature_order_info = self.trainer.fetch_train_data( index=batch_index) self.feature_order = feature_order_info['feature_order'] self.comm_manager.send( @@ -110,6 +103,25 @@ def callback_func_for_finish(self, message: Message): f"=================") # self._monitor.finish_fl() + def start_a_new_training_round(self, + batch_index, + feature_order_info, + tree_num=0): + self.msg_buffer.clear() + self.feature_order = feature_order_info['feature_order'] + self.msg_buffer[self.ID] = feature_order_info + self.state = tree_num + receiver = [ + each for each in list(self.comm_manager.neighbors.keys()) + if each not in [self.ID, self.server_id] + ] + self.comm_manager.send( + Message(msg_type='data_sample', + sender=self.ID, + state=self.state, + receiver=receiver, + content=batch_index)) + def check_and_move_on(self): if len(self.msg_buffer) == self.client_num: received_feature_order_infos = self.msg_buffer diff --git a/federatedscope/vertical_fl/xgb_base/worker/evaluation_wrapper.py b/federatedscope/vertical_fl/xgb_base/worker/evaluation_wrapper.py index bc769867d..fdda38835 100644 --- a/federatedscope/vertical_fl/xgb_base/worker/evaluation_wrapper.py +++ b/federatedscope/vertical_fl/xgb_base/worker/evaluation_wrapper.py @@ -10,12 +10,12 @@ def wrap_client_for_evaluation(client): def eval(self, tree_num): - self.criterion = get_vertical_loss( - self._cfg.criterion.type, - cal_hess=(self._cfg.model.type == 'xgb_tree')) + self.criterion = get_vertical_loss(loss_type=self._cfg.criterion.type, + model_type=self._cfg.model.type) if self.test_x is None: self.test_x, self.test_y = self._fetch_test_data() - self.test_result = np.zeros(self.test_x.shape[0]) + self.merged_test_result = list() + self.test_result = np.zeros(self.test_x.shape[0]) self.model[tree_num][0].indicator = np.ones(self.test_x.shape[0]) self._test_for_node(tree_num, node_num=0) @@ -26,8 +26,10 @@ def _fetch_test_data(self): return test_x, test_y def _feedback_eval_metrics(self): - test_loss = self.criterion.get_loss(self.test_y, self.test_result) - metrics = self.criterion.get_metric(self.test_y, self.test_result) + test_loss = self.criterion.get_loss(self.test_y, + self.merged_test_result) + metrics = self.criterion.get_metric(self.test_y, + self.merged_test_result) modified_metrics = dict() for key in metrics.keys(): if 'test' not in key: @@ -65,6 +67,7 @@ def _feedback_eval_metrics(self): def _test_for_node(self, tree_num, node_num): # All nodes have been traversed if node_num >= 2**self.model.max_depth - 1: + self.merged_test_result.append(self.test_result) if ( tree_num + 1 ) % self._cfg.eval.freq == 0 or \ @@ -74,9 +77,13 @@ def _test_for_node(self, tree_num, node_num): self._check_eval_finish(tree_num) # The client owns the weight elif self.model[tree_num][node_num].weight: + if self._cfg.model.type in ['xgb_tree', 'gbdt_tree']: + eta = self._cfg.train.optimizer.eta + else: + eta = 1.0 self.test_result += self.model[tree_num][ node_num].indicator * self.model[tree_num][ - node_num].weight * self._cfg.train.optimizer.eta + node_num].weight * eta self._test_for_node(tree_num, node_num + 1) # Other client owns the weight, need to communicate elif self.model[tree_num][node_num].member: @@ -92,7 +99,6 @@ def _test_for_node(self, tree_num, node_num): def callback_func_for_split_request(self, message: Message): if self.test_x is None: self.test_x, self.test_y = self._fetch_test_data() - self.test_result = np.zeros(self.test_x.shape[0]) tree_num, node_num = message.content sender = message.sender feature_idx = self.model[tree_num][node_num].feature_idx diff --git a/federatedscope/vertical_fl/xgb_base/worker/train_wrapper.py b/federatedscope/vertical_fl/xgb_base/worker/train_wrapper.py index 05f811ecb..fc8bb0a67 100644 --- a/federatedscope/vertical_fl/xgb_base/worker/train_wrapper.py +++ b/federatedscope/vertical_fl/xgb_base/worker/train_wrapper.py @@ -11,7 +11,6 @@ def train(self, tree_num, node_num=None, feature_order_info=None): if node_num is None: logger.info(f'----------- Building a new tree (Tree ' f'#{tree_num}) -------------') - self.state = tree_num finish_flag, results = self.trainer.train( feature_order_info=feature_order_info, tree_num=tree_num) else: @@ -31,17 +30,18 @@ def train(self, tree_num, node_num=None, feature_order_info=None): def _check_eval_finish(self, tree_num): if self.eval_finish_flag: if tree_num + 1 < self._cfg.model.num_of_trees: - self.train(tree_num + 1) + batch_index, feature_order_info = \ + self.trainer.fetch_train_data() + self.start_a_new_training_round(batch_index, + feature_order_info, + tree_num=tree_num + 1) def _find_and_send_split(self, split_ref, tree_num, node_num): - for index, dim in enumerate(self._cfg.vertical.dims): - if split_ref['feature_idx'] < dim: - prefix = self._cfg.vertical.dims[index - - 1] if index != 0 else 0 + accum_dim = 0 + for index, dim in enumerate(self.trainer.client_feature_num): + if split_ref['feature_idx'] < accum_dim + dim: client_id = index + 1 self.model[tree_num][node_num].member = client_id - self.model[tree_num][ - node_num].feature_idx = split_ref['feature_idx'] - prefix self.comm_manager.send( Message(msg_type='split', @@ -49,19 +49,22 @@ def _find_and_send_split(self, split_ref, tree_num, node_num): state=self.state, receiver=[client_id], content=(tree_num, node_num, - split_ref['feature_idx'] - prefix, + split_ref['feature_idx'] - accum_dim, split_ref['value_idx']))) break + else: + accum_dim += dim def callback_func_for_split(self, message: Message): tree_num, node_num, feature_idx, value_idx = message.content sender = message.sender self.state = message.state - self.feature_importance[feature_idx] += 1 + abs_feature_idx = self.trainer.get_abs_feature_idx(feature_idx) + self.feature_importance[abs_feature_idx] += 1 feature_value = self.trainer.get_feature_value(feature_idx, value_idx) - self.model[tree_num][node_num].feature_idx = feature_idx + self.model[tree_num][node_num].feature_idx = abs_feature_idx self.model[tree_num][node_num].feature_value = feature_value self.comm_manager.send( diff --git a/tests/test_xgb_and_gbdt.py b/tests/test_tree_based_model_for_vfl.py similarity index 83% rename from tests/test_xgb_and_gbdt.py rename to tests/test_tree_based_model_for_vfl.py index 96fb23d12..6c8af31f6 100644 --- a/tests/test_xgb_and_gbdt.py +++ b/tests/test_tree_based_model_for_vfl.py @@ -25,14 +25,13 @@ def set_config_for_xgb_base(self, cfg): cfg.model.type = 'xgb_tree' cfg.model.lambda_ = 0.1 cfg.model.gamma = 0 - cfg.model.num_of_trees = 5 + cfg.model.num_of_trees = 10 cfg.model.max_tree_depth = 3 cfg.train.optimizer.eta = 0.5 cfg.data.root = 'test_data/' cfg.data.type = 'adult' - cfg.data.size = 2000 cfg.dataloader.type = 'raw' cfg.dataloader.batch_size = 2000 @@ -42,6 +41,7 @@ def set_config_for_xgb_base(self, cfg): cfg.vertical.use = True cfg.vertical.dims = [7, 14] cfg.vertical.algo = 'xgb' + cfg.vertical.data_size_for_debug = 2000 cfg.trainer.type = 'verticaltrainer' cfg.eval.freq = 5 @@ -61,14 +61,13 @@ def set_config_for_gbdt_base(self, cfg): cfg.model.type = 'gbdt_tree' cfg.model.lambda_ = 0.1 cfg.model.gamma = 0 - cfg.model.num_of_trees = 5 + cfg.model.num_of_trees = 10 cfg.model.max_tree_depth = 3 cfg.train.optimizer.eta = 0.5 cfg.data.root = 'test_data/' cfg.data.type = 'adult' - cfg.data.size = 2000 cfg.dataloader.type = 'raw' cfg.dataloader.batch_size = 2000 @@ -78,6 +77,42 @@ def set_config_for_gbdt_base(self, cfg): cfg.vertical.use = True cfg.vertical.dims = [7, 14] cfg.vertical.algo = 'gbdt' + cfg.vertical.data_size_for_debug = 2000 + + cfg.trainer.type = 'verticaltrainer' + cfg.eval.freq = 5 + cfg.eval.best_res_update_round_wise_key = "test_loss" + + return backup_cfg + + def set_config_for_rf_base(self, cfg): + backup_cfg = cfg.clone() + + import torch + cfg.use_gpu = torch.cuda.is_available() + + cfg.federate.mode = 'standalone' + cfg.federate.client_num = 2 + + cfg.model.type = 'random_forest' + cfg.model.lambda_ = 0.1 + cfg.model.gamma = 0 + cfg.model.num_of_trees = 10 + cfg.model.max_tree_depth = 5 + + cfg.data.root = 'test_data/' + cfg.data.type = 'adult' + + cfg.dataloader.type = 'raw' + cfg.dataloader.batch_size = 1500 + + cfg.criterion.type = 'CrossEntropyLoss' + + cfg.vertical.use = True + cfg.vertical.dims = [7, 14] + cfg.vertical.algo = 'rf' + cfg.vertical.data_size_for_debug = 2000 + cfg.vertical.feature_subsample_ratio = 0.5 cfg.trainer.type = 'verticaltrainer' cfg.eval.freq = 5 @@ -97,14 +132,13 @@ def set_config_for_xgb_dp(self, cfg): cfg.model.type = 'xgb_tree' cfg.model.lambda_ = 0.1 cfg.model.gamma = 0 - cfg.model.num_of_trees = 5 + cfg.model.num_of_trees = 10 cfg.model.max_tree_depth = 3 cfg.train.optimizer.eta = 0.5 cfg.data.root = 'test_data/' cfg.data.type = 'adult' - cfg.data.size = 2000 cfg.dataloader.type = 'raw' cfg.dataloader.batch_size = 2000 @@ -117,6 +151,7 @@ def set_config_for_xgb_dp(self, cfg): cfg.vertical.protect_object = 'feature_order' cfg.vertical.protect_method = 'dp' cfg.vertical.protect_args = [{'bucket_num': 100, 'epsilon': 10}] + cfg.vertical.data_size_for_debug = 2000 cfg.trainer.type = 'verticaltrainer' cfg.eval.freq = 5 @@ -143,7 +178,6 @@ def set_config_for_xgb_dp_too_large_noise(self, cfg): cfg.data.root = 'test_data/' cfg.data.type = 'adult' - cfg.data.size = 2000 cfg.dataloader.type = 'raw' cfg.dataloader.batch_size = 2000 @@ -155,7 +189,8 @@ def set_config_for_xgb_dp_too_large_noise(self, cfg): cfg.vertical.algo = 'xgb' cfg.vertical.protect_object = 'feature_order' cfg.vertical.protect_method = 'dp' - cfg.vertical.protect_args = [{'bucket_num': 100, 'epsilon': 1}] + cfg.vertical.protect_args = [{'bucket_num': 100, 'epsilon': 0.1}] + cfg.vertical.data_size_for_debug = 2000 cfg.trainer.type = 'verticaltrainer' cfg.eval.freq = 5 @@ -175,14 +210,13 @@ def set_config_for_xgb_bucket(self, cfg): cfg.model.type = 'xgb_tree' cfg.model.lambda_ = 0.1 cfg.model.gamma = 0 - cfg.model.num_of_trees = 5 + cfg.model.num_of_trees = 10 cfg.model.max_tree_depth = 3 cfg.train.optimizer.eta = 0.5 cfg.data.root = 'test_data/' cfg.data.type = 'adult' - cfg.data.size = 2000 cfg.dataloader.type = 'raw' cfg.dataloader.batch_size = 2000 @@ -195,6 +229,7 @@ def set_config_for_xgb_bucket(self, cfg): cfg.vertical.protect_object = 'feature_order' cfg.vertical.protect_method = 'dp' cfg.vertical.protect_args = [{'bucket_num': 100}] + cfg.vertical.data_size_for_debug = 2000 cfg.trainer.type = 'verticaltrainer' cfg.eval.freq = 5 @@ -214,14 +249,13 @@ def set_config_for_xgb_op_boost_global(self, cfg): cfg.model.type = 'xgb_tree' cfg.model.lambda_ = 0.1 cfg.model.gamma = 0 - cfg.model.num_of_trees = 5 + cfg.model.num_of_trees = 10 cfg.model.max_tree_depth = 3 cfg.train.optimizer.eta = 0.5 cfg.data.root = 'test_data/' cfg.data.type = 'adult' - cfg.data.size = 2000 cfg.dataloader.type = 'raw' cfg.dataloader.batch_size = 2000 @@ -234,6 +268,7 @@ def set_config_for_xgb_op_boost_global(self, cfg): cfg.vertical.protect_object = 'feature_order' cfg.vertical.protect_method = 'op_boost' cfg.vertical.protect_args = [{'algo': 'global'}] + cfg.vertical.data_size_for_debug = 2000 cfg.trainer.type = 'verticaltrainer' cfg.eval.freq = 5 @@ -241,7 +276,7 @@ def set_config_for_xgb_op_boost_global(self, cfg): return backup_cfg - def set_config_for_xgb_op_boost_local(self, cfg): + def set_config_for_xgb_op_boost_adjust(self, cfg): backup_cfg = cfg.clone() import torch @@ -260,7 +295,6 @@ def set_config_for_xgb_op_boost_local(self, cfg): cfg.data.root = 'test_data/' cfg.data.type = 'adult' - cfg.data.size = 2000 cfg.dataloader.type = 'raw' cfg.dataloader.batch_size = 2000 @@ -273,6 +307,7 @@ def set_config_for_xgb_op_boost_local(self, cfg): cfg.vertical.protect_object = 'feature_order' cfg.vertical.protect_method = 'op_boost' cfg.vertical.protect_args = [{'algo': 'adjusting'}] + cfg.vertical.data_size_for_debug = 2000 cfg.trainer.type = 'verticaltrainer' cfg.eval.freq = 5 @@ -320,7 +355,28 @@ def test_GBDT_Base(self): init_cfg.merge_from_other_cfg(backup_cfg) print(test_results) self.assertGreater(test_results['server_global_eval']['test_acc'], - 0.75) + 0.78) + + def test_RF_Base(self): + init_cfg = global_cfg.clone() + backup_cfg = self.set_config_for_rf_base(init_cfg) + setup_seed(init_cfg.seed) + update_logger(init_cfg, True) + + data, modified_config = get_data(init_cfg.clone()) + init_cfg.merge_from_other_cfg(modified_config) + self.assertIsNotNone(data) + + Fed_runner = get_runner(data=data, + server_class=get_server_cls(init_cfg), + client_class=get_client_cls(init_cfg), + config=init_cfg.clone()) + self.assertIsNotNone(Fed_runner) + test_results = Fed_runner.run() + init_cfg.merge_from_other_cfg(backup_cfg) + print(test_results) + self.assertGreater(test_results['server_global_eval']['test_acc'], + 0.79) def test_XGB_use_dp(self): init_cfg = global_cfg.clone() @@ -361,7 +417,7 @@ def test_XGB_use_dp_too_large_noise(self): test_results = Fed_runner.run() init_cfg.merge_from_other_cfg(backup_cfg) print(test_results) - self.assertLess(test_results['server_global_eval']['test_acc'], 0.6) + self.assertLess(test_results['server_global_eval']['test_acc'], 0.76) def test_XGB_use_bucket(self): init_cfg = global_cfg.clone() @@ -402,11 +458,12 @@ def test_XGB_use_op_boost_global(self): test_results = Fed_runner.run() init_cfg.merge_from_other_cfg(backup_cfg) print(test_results) - self.assertGreater(test_results['server_global_eval']['test_acc'], 0.7) + self.assertGreater(test_results['server_global_eval']['test_acc'], + 0.79) - def test_XGB_use_op_boost_local(self): + def test_XGB_use_op_boost_adjust(self): init_cfg = global_cfg.clone() - backup_cfg = self.set_config_for_xgb_op_boost_local(init_cfg) + backup_cfg = self.set_config_for_xgb_op_boost_adjust(init_cfg) setup_seed(init_cfg.seed) update_logger(init_cfg, True) @@ -422,7 +479,8 @@ def test_XGB_use_op_boost_local(self): test_results = Fed_runner.run() init_cfg.merge_from_other_cfg(backup_cfg) print(test_results) - self.assertGreater(test_results['server_global_eval']['test_acc'], 0.7) + self.assertGreater(test_results['server_global_eval']['test_acc'], + 0.79) if __name__ == '__main__':