Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save ZeRO3 (partitioned) fp16 weights #882

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1342,10 +1342,15 @@ def all_gather_scalar(self, value):

def module_state_dict(self, destination=None, prefix='', keep_vars=False):
sd = self.module.state_dict(destination, prefix, keep_vars)
if self.zero_optimization_partition_weights():
sd = self.optimizer.save_partitioned_weights(sd)
return sd

def load_module_state_dict(self, state_dict, strict=True):
self.module.load_state_dict(state_dict, strict=strict)
if self.zero_optimization_partition_weights():
self.optimizer.load_partitioned_weights(state_dict)
else:
self.module.load_state_dict(state_dict, strict=strict)

def _get_rank_zero_ckpt_name(self, checkpoints_path, tag, mp_rank, dp_rank):
filename = 'zero_pp_rank_{}'.format(dp_rank)
Expand Down Expand Up @@ -1445,6 +1450,7 @@ def _load_checkpoint(self,

self.load_module_state_dict(state_dict=checkpoint['module'],
strict=load_module_strict)

if self.optimizer is not None and not self.zero_optimization():
if self.fp16_enabled():
self.optimizer.load_state_dict(
Expand Down
11 changes: 11 additions & 0 deletions deepspeed/runtime/zero/stage3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2832,6 +2832,17 @@ def save_checkpoint_prologue(self):
def save_checkpoint_epilogue(self):
self.persistent_parameters[0].all_gather(self.persistent_parameters)

def save_partitioned_weights(self, state_dict):
for name, param in self.module.named_parameters():
if name in state_dict.keys():
state_dict[name] = param.ds_tensor
Copy link
Collaborator

@stas00 stas00 Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found an issue here: param.ds_tensor in this place appears to be is a flattened buffer. So state_dicts ends up being populated with 1D vectors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we can't shape it back to the original since we only have a part of the tensor, so doing something like narrow(0, 0, param.ds_numel).view(param.ds_shape) from _allgather_param() won't work and the shape has no meaning here anyway.

So this line of logic is useful when it's used to load the param.ds_tensor directly by each gpu, as coded in the rest of this PR.

I just tried to use it to get the partitioned fp16 weights, but now I understand this is not possible using this approach.

Bottom line - there is no problem here, just needed to understand that this is not a real state_dict that is being saved but something like flattened_params_state_dict.

All is good!

return state_dict

def load_partitioned_weights(self, state_dict):
for name, param in self.module.named_parameters():
if name in state_dict.keys():
param.ds_tensor.copy_(state_dict[name])


def _handle_overflow(cpu_sum, x, i):
import math
Expand Down
4 changes: 2 additions & 2 deletions docs/_tutorials/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ local machine to discover the number of slots available. The `--include` and
`--exclude` arguments work as normal, but the user should specify 'localhost'
as the hostname.

Also note that `CUDA_VISIBLE_DEVICES` can't be used with DeepSpeed to control
which devices should be used. For example, to use only gpu1 of the current
Also note that `CUDA_VISIBLE_DEVICES` can't be used with DeepSpeed to control
which devices should be used. For example, to use only gpu1 of the current
node, do:
```bash
deepspeed --include localhost:1 ...
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/test_checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ def compare_model_states(saved_model, loaded_model, compare_optimizer=True):
assert id(p0) != id(p1), f'Comparing fp16 model state tensor against itself : {id(p0)} <====> {id(p1)}'
assert torch.allclose(p0, p1, atol=1e-07), f"FP16 model state {p0} is not equal to {p1}"

# Compare ds_tensor values for ZeRO stage3
for p0, p1 in zip(saved_model.module.parameters(), loaded_model.module.parameters()):
p0_has_ds_tensor = hasattr(p0, 'ds_tensor')
p1_has_ds_tensor = hasattr(p1, 'ds_tensor')
assert p0_has_ds_tensor == p1_has_ds_tensor, f'Mismatch has ds_tensor attribute p0:{p0_has_ds_tensor}, p1:{p1_has_ds_tensor}'
if p0_has_ds_tensor:
assert torch.allclose(p0.ds_tensor, p1.ds_tensor, atol=1e-07), f'FP16 model state {p0} is not equal to {p1}'

if not compare_optimizer:
return

Expand Down