Skip to content

Commit

Permalink
Fix Multi-GPU join for horovod (#6954)
Browse files Browse the repository at this point in the history
* fixjoin

* fix join on cpu

* fix typo

* try to undo horovod skip

* undo

* Try removing skip

* Update CHANGELOG

* add back skip for test_horovod_multi_optimizer

* Add back skip

Co-authored-by: Adrian Wälchli <[email protected]>
Co-authored-by: Carlos Mocholi <[email protected]>
  • Loading branch information
2 people authored and lexierule committed Apr 22, 2021
1 parent b012593 commit afa5f0e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
### Fixed

- Fixed the order to call for world ranks & the `root_device` property in `TPUSpawnPlugin` ([#7074](https://github.com/PyTorchLightning/pytorch-lightning/pull/7074))

- Fixed multi-gpu join for Horovod ([#6954](https://github.com/PyTorchLightning/pytorch-lightning/pull/6954))

## [1.2.8] - 2021-04-14

Expand All @@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
* Remove hardcoding of local rank in accelerator connector ([#6878](https://github.com/PyTorchLightning/pytorch-lightning/pull/6878))



## [1.2.7] - 2021-04-06

### Fixed
Expand Down
18 changes: 12 additions & 6 deletions pytorch_lightning/plugins/training_type/horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,26 +106,26 @@ def start_training(self, trainer):
self._results = trainer.run_train()

# Make sure all workers have finished training before returning to the user
hvd.join()
self.join()

def start_testing(self, trainer):
with ExitStack():
self._results = trainer.run_test()

# Make sure all workers have finished training before returning to the user
hvd.join()
self.join()

def start_predicting(self, trainer):
with ExitStack():
# set up training routine
self._results = trainer.run_predict()

# Make sure all workers have finished training before returning to the user
hvd.join()
self.join()

def barrier(self, *args, **kwargs):
if torch_distrib.is_initialized():
hvd.join()
self.join()

def broadcast(self, obj: object, src: int = 0) -> object:
obj = hvd.broadcast_object(obj, src)
Expand All @@ -136,6 +136,12 @@ def model_to_device(self):
torch.cuda.set_device(self.root_device)
self.model.to(self.root_device)

def join(self):
if self.on_gpu:
hvd.join(self.local_rank)
else:
hvd.join()

def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None):
if group is not None:
raise ValueError(
Expand All @@ -153,7 +159,7 @@ def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[
raise ValueError(f"unrecognized `reduce_op`: {reduce_op}")

# sync all processes before reduction
hvd.join()
self.join()
return hvd.allreduce(output, op=reduce_op)

def all_gather(
Expand All @@ -173,7 +179,7 @@ def all_gather(
result = result.reshape(1)

# sync and gather all
hvd.join()
self.join()
gathered = hvd.allgather(result)
gathered_result = list(gathered.split(1, dim=0))
return gathered_result
Expand Down

0 comments on commit afa5f0e

Please sign in to comment.