-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PoC] Training Restart - V2 #8337
Conversation
…TorchLightning/pytorch-lightning into refactor/logger-connector-poc
…s_everywhere_train
for more information, see https://pre-commit.ci
…s_everywhere_train
pytorch_lightning/loops/base.py
Outdated
if self.restarting: | ||
self.restore() | ||
self.restarting = False | ||
if not is_overridden("restore", self, Loop): | ||
warning_cache.warn(f"{self.__class__.__name__} Loop doesn't override the restore function.") | ||
is_restore_finished = self.restore() | ||
if is_restore_finished is None or is_restore_finished: | ||
self.restarting = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of
if restarting
self.restore()
else:
self.reset()
would it make sense to do:
self.reset()
if restarting:
self.restore()
?
Then we don't need the self._initialize()
in the subclassed loops and we can assume after reset()
the state is properly initialized and we can restore if appropritae.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review part 1/n
this PR is integrating progress tracking
do you see a way to break it out or is it rather not so easy?
pytorch_lightning/loops/base.py
Outdated
Restore the internal state of the loop the beginning of run if restarting is ``True``. | ||
|
||
Returns: | ||
is_restore_finished: Whether the restoration actually finished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in which case will a restoration not finish.
is this this about a success/fail return signal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a mechanism to enable a loop to delay the restarting variable to be set to False.
In TrainingBatchLoop
, this property is required to know if we should fast-forward
the dataloaders, therefore by return False, I indicate to the loop I will manually switch off the restarting boolean later on in the loop.
pytorch_lightning/loops/base.py
Outdated
def load_state_dict(self, state_dict: Dict) -> None: | ||
"""Reload Loop state""" | ||
|
||
def get_state_dict(self, destination: Optional[OrderedDict] = None, prefix: Optional[str] = '') -> OrderedDict: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed, could we consider merging several of these state_dict related APIs into one.
IMO we should aim at keeping the base loop class as simple as possible.
def restore(self) -> bool: | ||
"""Restore the loop state""" | ||
self._initialize() | ||
|
||
# restarting isn't finished yet. | ||
return False | ||
|
||
def reset(self) -> None: | ||
"""Resets the loop state""" | ||
self._initialize() | ||
|
||
self.optim_progress.optimizer.reset_on_epoch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these _initialize() calls could be simplified as proposed in the earlier comment
self.progress.increment_completed() | ||
return super().on_advance_end() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably increment after the super call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done !
# handle optimization restart | ||
if self.restarting: | ||
if len(active_optimizers) > 1 and opt_idx < self.progress.current.completed: | ||
continue | ||
self.restarting = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay I see.
this is something a optimizer loop could handle for restarting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly !
@property | ||
def done(self) -> bool: | ||
"""Returns whether the training should be stopped. | ||
The criteria are that the number of steps reached the max steps, | ||
the last batch is reached or the trainer signals to stop (e.g. by early stopping). | ||
""" | ||
max_steps_reached = self.max_steps is not None and self.global_step >= self.max_steps | ||
max_steps_reached = self.max_steps is not None and (self.total_optimizer_step) >= self.max_steps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max steps in the trainer does not represent optimizer steps (today). It represents how many times training_step() was called (roughly). If we want to change that I think it should be done in a separate PR with a proper evaluation of what the effects are.
# share ddp pids to all processes | ||
self.share_pids() | ||
|
||
def share_pids(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover from a merge :)
@@ -141,6 +144,13 @@ def restore_model(self) -> None: | |||
# restore model state_dict | |||
self.trainer.training_type_plugin.load_model_state_dict(self._loaded_checkpoint) | |||
|
|||
gradients = self._loaded_checkpoint.get("gradients", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would we call this feature? gradient checkpointing 🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the users had gradient accumulation activated.
self.trainer.fit_loop = self.on_fit_loop_init(*args, **kwargs) | ||
self.trainer.validate_loop = self.on_validate_loop_init(*args, **kwargs) | ||
self.trainer.test_loop = self.on_test_loop_init(*args, **kwargs) | ||
self.trainer.predict_loop = self.on_predict_loop_init(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a loop connector could be useful in general, but we said we wanted to avoid Trainer state management outside the trainer. If Trainer owns loops, imo it would be safer to let the Trainer manage its state.
@@ -111,6 +115,41 @@ def auto_add_worker_init_fn(self, dataloader: DataLoader) -> None: | |||
if int(os.environ.get("PL_SEED_WORKERS", 0)) and dataloader.worker_init_fn is None: | |||
dataloader.worker_init_fn = partial(pl_worker_init_function, rank=self.global_rank) | |||
|
|||
def add_samplers_to_iterable_dataset(self, dataloader: DataLoader): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we have a little bit of code duplication from the replace_sampler
method.
We could consider sharing some code for the DataLoader attribute discovery and patching.
# detect iterable dataset | ||
contains_iterable_dataset = False | ||
|
||
def detect_iterable_dataset(dataloader: DataLoader): | ||
nonlocal contains_iterable_dataset | ||
if isinstance(dataloader.dataset, IterableDataset): | ||
contains_iterable_dataset = True | ||
|
||
apply_to_collection(loaders, DataLoader, detect_iterable_dataset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could potentially be factored out into a helper function.
obj = cls() | ||
obj.load_state_dict(state_dict) | ||
return obj | ||
|
||
|
||
class ProgressDict(Dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this what a regular dict can do?
old_dict = dict(...)
new_dict = dict(old_dict)
new_dict.update(...)
partial(cycle_to_next_worker, state_dict=self.loaders_iter_state_dict, count=count) | ||
) | ||
|
||
count = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these local vars can be removed, right?
if trainer: | ||
trainer.current_iterator = it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain what this workaround is about?
I can see that you are using it in the checkpoint connector but it is not clear to me what problem it is trying to solve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed ! This was used to not lose the Iterator reference as it contains the DataLoader Iterators.
@@ -995,12 +995,41 @@ def _run_train(self) -> None: | |||
if distributed_available() and self.world_size > 1: | |||
# try syncing remaing processes, kill otherwise | |||
self.training_type_plugin.reconciliate_processes(traceback.format_exc()) | |||
# save a checkpoint for fault tolerant training | |||
self.fit_loop._check_checkpoint_callback(True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@carmocca that's what I meant the other day when we talked about why we have the check_checkpoint_callback
in the first place.
this usage here was the old way Lightning would save checkpoints on interrupt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldnt manually call the ModelCheckpoint
callback like this as it will be moved to a fully hook-based callback next week.
If training is stopped and we need to save, we should do it by calling the on_train_end
hook
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding to this:
If we need to manually save a checkpoint, we can always call trainer.save_checkpoint()
and this doesn't go through the ModelCheckpoint :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer for the ModelCheckpoint to save the checkpoint as the last_model_path should be the one saved on failure.
state_dict.append(iterable_dataset_state_dict) | ||
return data["data"] | ||
out.append((k, CaptureIterableDataset._sanetize_batch_from_sampler_state(v, state_dict))) | ||
return elem_type(OrderedDict(out)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a specific reason for using OrderedDict
vs. just dict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following apply_to_collection implementation. I believe there were a reason why OrderedDict was selected over it, but not sure why.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need further help see our docs: https://pytorch-lightning.readthedocs.io/en/latest/generated/CONTRIBUTING.html#pull-request or ask the assistance of a core contributor here or on Slack. Thank you for your contributions. |
This pull request is going to be closed. Please feel free to reopen it create a new from the actual master. |
What does this PR do?
Different Loop API compared to #8131
TODOS
[] Resolve
batch_idx
not properly reset in loops[] Add ProgressBar proper indices
[] Resolve _map_dl_idx_sampler_states should be updated after the batch is processed
[] Save extras for on epoch_end
[] Add ResultCollection to
state dict
andreload
automatically onBase Loop
[] Enable non-fault tolerant reload.
Does your PR introduce any breaking changes ? If yes, please list them.
Before submitting
PR review
Anyone in the community is free to review the PR once the tests have passed.
Before you start reviewing make sure you have read Review guidelines. In short, see the following bullet-list:
Did you have fun?
Make sure you had fun coding 🙃