diff --git a/CHANGELOG.md b/CHANGELOG.md index 0508526744c62..3fabd32416972 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Fixed - Fixed the order to call for world ranks & the `root_device` property in `TPUSpawnPlugin` ([#7074](https://github.com/PyTorchLightning/pytorch-lightning/pull/7074)) - +- Fixed multi-gpu join for Horovod ([#6954](https://github.com/PyTorchLightning/pytorch-lightning/pull/6954)) ## [1.2.8] - 2021-04-14 @@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). * Remove hardcoding of local rank in accelerator connector ([#6878](https://github.com/PyTorchLightning/pytorch-lightning/pull/6878)) + ## [1.2.7] - 2021-04-06 ### Fixed diff --git a/pytorch_lightning/plugins/training_type/horovod.py b/pytorch_lightning/plugins/training_type/horovod.py index 7c6c2d5525e8b..badbcbe8fcf5b 100644 --- a/pytorch_lightning/plugins/training_type/horovod.py +++ b/pytorch_lightning/plugins/training_type/horovod.py @@ -106,14 +106,14 @@ def start_training(self, trainer): self._results = trainer.run_train() # Make sure all workers have finished training before returning to the user - hvd.join() + self.join() def start_testing(self, trainer): with ExitStack(): self._results = trainer.run_test() # Make sure all workers have finished training before returning to the user - hvd.join() + self.join() def start_predicting(self, trainer): with ExitStack(): @@ -121,11 +121,11 @@ def start_predicting(self, trainer): self._results = trainer.run_predict() # Make sure all workers have finished training before returning to the user - hvd.join() + self.join() def barrier(self, *args, **kwargs): if torch_distrib.is_initialized(): - hvd.join() + self.join() def broadcast(self, obj: object, src: int = 0) -> object: obj = hvd.broadcast_object(obj, src) @@ -136,6 +136,12 @@ def model_to_device(self): torch.cuda.set_device(self.root_device) self.model.to(self.root_device) + def join(self): + if self.on_gpu: + hvd.join(self.local_rank) + else: + hvd.join() + def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None): if group is not None: raise ValueError( @@ -153,7 +159,7 @@ def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ raise ValueError(f"unrecognized `reduce_op`: {reduce_op}") # sync all processes before reduction - hvd.join() + self.join() return hvd.allreduce(output, op=reduce_op) def all_gather( @@ -173,7 +179,7 @@ def all_gather( result = result.reshape(1) # sync and gather all - hvd.join() + self.join() gathered = hvd.allgather(result) gathered_result = list(gathered.split(1, dim=0)) return gathered_result