Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelization for standalone mode with NCCL #487

Merged
merged 27 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7962674
standalone full paralllel
pan-x-c Oct 7, 2022
72beefa
use multi gpu
pan-x-c Oct 8, 2022
f393615
add client runner to simulate multiple clients
pan-x-c Oct 26, 2022
d709169
debug client runner client_id out of range
pan-x-c Oct 26, 2022
7a43fd8
dev: parallel training in standalone mode
xieyxclack Oct 31, 2022
afb4f5d
dev for parallel training
xieyxclack Nov 4, 2022
bf70b7f
minor fix for testing
xieyxclack Nov 4, 2022
8822f57
Standalone mode parallelization for refactored fed runner
pan-x-c Nov 4, 2022
46e2361
add StandaloneMultiprocessRunner
pan-x-c Dec 26, 2022
568dd2f
add multigpu runner
pan-x-c Dec 27, 2022
e507d77
finish standalone runner with torch ddp
pan-x-c Jan 5, 2023
4e59078
fix monitor log
pan-x-c Jan 6, 2023
13f0e23
reduce model clone times
pan-x-c Jan 9, 2023
d8bea89
fix conflicts in config
pan-x-c Jan 9, 2023
efeae7e
fix conflicts in config
pan-x-c Feb 7, 2023
7630a7f
Merge branch 'alibaba:master' into feature/pxc/torch_ddp
pan-x-c Mar 9, 2023
12d4ed4
clean multi process code
pan-x-c Mar 9, 2023
3eb5e18
clean multi process code
pan-x-c Mar 9, 2023
5c2d5d8
format code
pan-x-c Mar 9, 2023
cd435a3
debug runner builder
pan-x-c Mar 9, 2023
80a937e
debug process_num
pan-x-c Mar 9, 2023
7b6d188
debug config
pan-x-c Mar 9, 2023
2e295db
debug config
pan-x-c Mar 9, 2023
7e5fef5
debug comm_queue
pan-x-c Mar 10, 2023
0b01889
debug comm_queue
pan-x-c Mar 10, 2023
186d5ad
support auto termination in multi-gpu mode
pan-x-c Mar 13, 2023
1082ffe
add readme for standalone multigpu runner
pan-x-c Mar 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions federatedscope/autotune/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,11 @@ def eval_in_fs(cfg, config=None, budget=0, client_cfgs=None, trial_index=0):
data, modified_config = get_data(config=trial_cfg.clone())
trial_cfg.merge_from_other_cfg(modified_config)
trial_cfg.freeze()
fed_runner = get_runner(data=data,
server_class=get_server_cls(trial_cfg),
fed_runner = get_runner(server_class=get_server_cls(trial_cfg),
client_class=get_client_cls(trial_cfg),
config=trial_cfg.clone(),
client_configs=client_cfgs)
client_configs=client_cfgs,
data=data)
results = fed_runner.run()

return results
Expand Down
36 changes: 26 additions & 10 deletions federatedscope/core/auxiliaries/runner_builder.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from federatedscope.core.fed_runner import StandaloneRunner, DistributedRunner
from federatedscope.core.parallel.parallel_runner import \
StandaloneMultiGPURunner


def get_runner(data, server_class, client_class, config, client_configs=None):
def get_runner(server_class,
client_class,
config,
client_configs=None,
data=None):
"""
Instantiate a runner based on a configuration file

Args:
data: ``core.data.StandaloneDataDict`` in standalone mode, \
``core.data.ClientData`` in distribute mode
server_class: server class
client_class: client class
config: configurations for FL, see ``federatedscope.core.configs``
Expand All @@ -18,21 +22,33 @@ def get_runner(data, server_class, client_class, config, client_configs=None):

Note:
The key-value pairs of built-in runner and source are shown below:
=============================== ==============================
Mode Source
=============================== ==============================
``standalone`` ``core.fed_runner.StandaloneRunner``
``distributed`` ``core.fed_runner.DistributedRunner``
=============================== ==============================
============================= ===============================
Mode Source
============================= ===============================
``standalone`` ``core.fed_runner.StandaloneRunner``
``distributed`` ``core.fed_runner.DistributedRunner``
``standalone(process_num>1)`` ``core.auxiliaries.parallel_runner.``
``StandaloneMultiGPURunner``
============================= ===============================
"""

mode = config.federate.mode.lower()
process_num = config.federate.process_num

if mode == 'standalone':
runner_cls = StandaloneRunner
if process_num <= 1:
runner_cls = StandaloneRunner
else:
runner_cls = StandaloneMultiGPURunner
elif mode == 'distributed':
runner_cls = DistributedRunner

# federated dataset might change the number of clients
# thus, we allow the creation procedure of dataset to modify the global
# cfg object
if runner_cls is StandaloneMultiGPURunner:
data = None

return runner_cls(data=data,
server_class=server_class,
client_class=client_class,
Expand Down
57 changes: 57 additions & 0 deletions federatedscope/core/communication.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import grpc
from concurrent import futures
import logging
import torch.distributed as dist

from collections import deque

from federatedscope.core.configs.config import global_cfg
from federatedscope.core.proto import gRPC_comm_manager_pb2, \
gRPC_comm_manager_pb2_grpc
from federatedscope.core.gRPC_server import gRPCComServeFunc
from federatedscope.core.message import Message

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class StandaloneCommManager(object):
"""
Expand Down Expand Up @@ -39,7 +46,57 @@ def get_neighbors(self, neighbor_id=None):
return self.neighbors

def send(self, message):
# All the workers share one comm_queue
self.comm_queue.append(message)


class StandaloneDDPCommManager(StandaloneCommManager):
"""
The communicator used for standalone mode with multigpu
"""
def __init__(self, comm_queue, monitor=None, id2comm=None):
super().__init__(comm_queue, monitor)
self.id2comm = id2comm
self.device = "cuda:{}".format(dist.get_rank())

def _send_model_para(self, model_para, dst_rank):
for v in model_para.values():
t = v.to(self.device)
dist.send(tensor=t, dst=dst_rank)

def send(self, message):
is_model_para = message.msg_type == 'model_para'
is_evaluate = message.msg_type == 'evaluate'
if self.id2comm is None:
# client to server
if is_model_para:
model_para = message.content[1]
message.content = (message.content[0], {})
self.comm_queue.append(message) if isinstance(
self.comm_queue, deque) else self.comm_queue.put(message)
self._send_model_para(model_para, 0)
else:
self.comm_queue.append(message) if isinstance(
self.comm_queue, deque) else self.comm_queue.put(message)
else:
receiver = message.receiver
if not isinstance(receiver, list):
receiver = [receiver]
if is_model_para or is_evaluate:
model_para = message.content
message.content = {}
for idx, each_comm in enumerate(self.comm_queue):
for each_receiver in receiver:
if each_receiver in self.neighbors and \
self.id2comm[each_receiver] == idx:
each_comm.put(message)
break
if is_model_para or is_evaluate:
for each_receiver in receiver:
if each_receiver in self.neighbors and \
self.id2comm[each_receiver] == idx:
self._send_model_para(model_para, idx + 1)
break
download_bytes, upload_bytes = message.count_bytes()
self.monitor.track_upload_bytes(upload_bytes)

Expand Down
1 change: 1 addition & 0 deletions federatedscope/core/configs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ The configurations related to FL settings are defined in `cfg_fl_setting.py`.
| `federate.join_in_info` | (list of string) [] | The information requirements (from server) for joining in the FL course. | We support 'num_sample/client_resource' and allow user customization.
| `federate.sampler` | (string) 'uniform' </br> Choices: {'uniform', 'group'} | The sample strategy of server used for client selection in a training round. | - |
| `federate.` </br>`resource_info_file` | (string) '' | the device information file to record computation and communication ability | - |
| `federate.process_num` | (int) 1 | The number of parallel processes. It only takes effect when `use_gpu=True`, `backend='torch'`, `federate.mode='standalone'` and `federate.share_local_model=False`, and the value is required to be not greater than the number of GPUs. | - |
#### `distribute`: for distribute mode
| Name | (Type) Default Value | Description | Note |
|:----:|:-----:|:---------- |:---- |
Expand Down
19 changes: 19 additions & 0 deletions federatedscope/core/configs/cfg_fl_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from federatedscope.core.configs.config import CN
from federatedscope.register import register_config
import torch

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,6 +45,11 @@ def extend_fl_setting_cfg(cfg):
cfg.federate.resource_info_file = "" # the device information file to
# record computation and communication ability

# The configurations for parallel in standalone
cfg.federate.process_num = 1
cfg.federate.master_addr = '127.0.0.1' # parameter of torch distributed
cfg.federate.master_port = 29500 # parameter of torch distributed

# atc (TODO: merge later)
cfg.federate.atc_vanilla = False
cfg.federate.atc_load_from = ''
Expand Down Expand Up @@ -198,6 +204,19 @@ def assert_fl_setting_cfg(cfg):
logger.warning('Set cfg.federate.make_global_eval=True since '
'cfg.federate.merge_test_data=True')

if cfg.federate.process_num > 1 and cfg.federate.mode != 'standalone':
cfg.federate.process_num = 1
logger.warning('Parallel training can only be used in standalone mode'
', thus cfg.federate.process_num is modified to 1')
if cfg.federate.process_num > 1 and not torch.cuda.is_available():
cfg.federate.process_num = 1
logger.warning(
'No GPU found for your device, set cfg.federate.process_num=1')
if torch.cuda.device_count() < cfg.federate.process_num:
cfg.federate.process_num = torch.cuda.device_count()
logger.warning(
'We found the number of gpu is insufficient, '
f'thus cfg.federate.process_num={cfg.federate.process_num}')
# TODO
if cfg.vertical.use:
if cfg.vertical.algo == 'lr' and hasattr(cfg, "trainer") and \
Expand Down
4 changes: 4 additions & 0 deletions federatedscope/core/configs/yacs_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,10 @@ def _merge_a_into_b(a, b, root, key_list):
else:
if root.key_is_deprecated(full_key):
continue
elif k in [
'__cfg_check_funcs__', '__help_info__', 'is_ready_for_run'
]:
continue
elif root.key_is_renamed(full_key):
root.raise_key_rename_error(full_key)
else:
Expand Down
6 changes: 6 additions & 0 deletions federatedscope/core/fed_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ def check(self):

class StandaloneRunner(BaseRunner):
def _set_up(self):
"""
To set up server and client for standalone mode.
"""
self.is_run_online = True if self.cfg.federate.online_aggr else False
self.shared_comm_queue = deque()

Expand Down Expand Up @@ -500,6 +503,9 @@ def _run_simulation(self):

class DistributedRunner(BaseRunner):
def _set_up(self):
"""
To set up server or client for distributed mode.
"""
# sample resource information
if self.resource_info is not None:
sampled_index = np.random.choice(list(self.resource_info.keys()))
Expand Down
1 change: 1 addition & 0 deletions federatedscope/core/monitors/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
torch = None

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

global_all_monitors = [
] # used in standalone mode, to merge sys metric results for all workers
Expand Down
85 changes: 85 additions & 0 deletions federatedscope/core/parallel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Parallelization for standalone mode

To facilitate developers to quickly verify their algorithms, we designed and implemented `StandaloneMultiGPURunner` with torch distributed data parallel (DDP). The new runner can better utilize the computing resources of multiple GPUs and accelerate training in standalone mode of FederatedScope.

## When to use
Use `StandaloneMultiGPURunner` when you have **multiple GPUs (>=2)** in your machine and need quick verification with **standalone mode**.


## Configuration

Add `federate.process_num` item in the configuration file to parallelize the training.

> Note: `federate.process_num` only takes effect when `use_gpu=True`, `backend='torch'`, `federate.mode='standalone'` and `federate.share_local_model=False`, and the value is required to be not greater than the number of GPUs.

```yaml
use_gpu: True
backend: 'torch'
device: 0
early_stop:
patience: 5
seed: 12345
federate:
mode: standalone
client_num: 100
total_round_num: 20
sample_client_rate: 0.2
share_local_model: False
process_num: 4 # run 4 processes simultaneously
...
```

## Use cases

Here we give an example to demonstrate the efficiency of `StandaloneMultiGPURunner` compared to `StandaloneRunner`. The configuration file and experiment result are listed below.
The experiment result shows that the totoal running time of `StandaloneMultiGPURunner` is only 1/3 of `StandaloneRunner` in the case of 8 GPUs.

```yaml
use_gpu: True
device: 0
early_stop:
patience: 5
seed: 12345
federate:
mode: standalone
client_num: 100
total_round_num: 10
sample_client_rate: 0.4
share_local_model: False
# use StandaloneMultiGPURunner with 8 GPUs
process_num: 8
# use StandaloneRunner
# process_num: 1

data:
root: data/
type: femnist
splits: [0.6,0.2,0.2]
batch_size: 10
subsample: 0.05
num_workers: 0
transform: [['ToTensor'], ['Normalize', {'mean': [0.1307], 'std': [0.3081]}]]
model:
type: convnet2
hidden: 2048
out_channels: 62
train:
local_update_steps: 1
batch_or_epoch: epoch
optimizer:
lr: 0.01
weight_decay: 0.0
grad:
grad_clip: 5.0
criterion:
type: CrossEntropyLoss
trainer:
type: cvtrainer
eval:
freq: 10
metrics: ['acc', 'correct']
```

| | StandaloneMultiGPURunner | StandaloneRunner |
| :---: | :---: | :---: |
| Total running time (minute) | 0.2406 | 0.7292 |
Empty file.
Loading